目录
4.3 cancel(boolean mayInterruptIfRunning)
3.3 FutureTask的set&setException方法
3.6 FutureTask的finishCompletion方法
一、FutureTask基本概念
1. 概述
FutureTask
是 Java 并发工具包 java.util.concurrent
中的一个重要组件,它实现了 RunnableFuture
接口,后者继承了 Runnable
和 Future
接口。这意味着 FutureTask
不仅可以作为一个任务被线程执行,还可以用来获取异步计算的结果。FutureTask
主要用于封装那些可以异步执行的任务,并且可以在任务完成后获取任务的执行结果。
2. 核心接口
2.1 Runnable
Runnable
是 Java 中最基本的多线程接口,定义了一个 run()
方法,该方法没有任何返回值,通常用于表示不需要返回结果的任务。
2.2 Callable
Callable
是 Runnable
的增强版本,定义了一个 call()
方法,该方法可以返回一个结果,并且可以抛出异常。Callable
通常用于表示需要返回结果的任务。
2.3 Future
Future
接口代表异步计算的结果,提供了几种方法来检查任务是否完成、获取任务结果以及取消任务。主要方法包括:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消任务的执行。boolean isCancelled()
:判断任务是否被取消。boolean isDone()
:判断任务是否已经完成。V get()
:获取任务的结果,如果任务未完成,此方法会阻塞。V get(long timeout, TimeUnit unit)
:在指定时间内获取任务的结果,如果任务未完成,此方法会阻塞一段时间后抛出TimeoutException
。
2.4 RunnableFuture
RunnableFuture
接口同时继承了 Runnable
和 Future
,表示一个可以作为任务执行并且可以获取结果的对象。
3. FutureTask
的构造方法
FutureTask
提供了两个主要的构造方法:
FutureTask(Callable<V> callable)
:接受一个Callable
对象,用于创建一个可以返回结果的FutureTask
。FutureTask(Runnable runnable, V result)
:接受一个Runnable
对象和一个结果值,用于创建一个可以返回指定结果的FutureTask
。
4. FutureTask
的核心方法
4.1 run()
run()
方法用于启动任务的执行。如果 FutureTask
是通过 Callable
创建的,run()
方法会调用 Callable
的 call()
方法,并将结果保存起来;如果 FutureTask
是通过 Runnable
创建的,run()
方法会调用 Runnable
的 run()
方法,并将指定的结果值保存起来。
4.2 get()
get()
方法用于获取任务的执行结果。如果任务还未完成,调用 get()
方法会导致当前线程阻塞,直到任务完成。
4.3 cancel(boolean mayInterruptIfRunning)
cancel()
方法用于取消任务的执行。如果任务已经完成或已被取消,此方法无效。如果任务正在执行且 mayInterruptIfRunning
参数为 true
,则会尝试中断执行任务的线程。
5. 使用示例
5.1 使用 Callable
创建 FutureTask
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建一个 Callable 任务
Callable<Integer> callable = () -> {
System.out.println("任务开始执行...");
Thread.sleep(2000);
return 42;
};
// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 创建线程并启动任务
Thread thread = new Thread(futureTask);
thread.start();
try {
// 获取任务结果
Integer result = futureTask.get();
System.out.println("任务执行结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
5.2 使用 Runnable
创建 FutureTask
import java.util.concurrent.*;
public class RunnableFutureTaskExample {
public static void main(String[] args) {
// 创建一个 Runnable 任务
Runnable runnable = () -> {
System.out.println("任务开始执行...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 创建 FutureTask,指定结果值
FutureTask<String> futureTask = new FutureTask<>(runnable, "任务完成");
// 创建线程并启动任务
Thread thread = new Thread(futureTask);
thread.start();
try {
// 获取任务结果
String result = futureTask.get();
System.out.println("任务执行结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
6. FutureTask
的内部实现
FutureTask
的内部实现依赖于 AbstractQueuedSynchronizer
(AQS),这是一个用于构建锁和其他同步器的基础框架。FutureTask
内部维护了一个状态变量 state
,用于记录任务的执行状态。任务的状态变化包括:
NEW
:任务新建但未开始执行。COMPLETING
:任务正在完成。NORMAL
:任务正常完成。EXCEPTIONAL
:任务因异常而完成。CANCELLED
:任务被取消。INTERRUPTING
:任务正在被中断。INTERRUPTED
:任务已中断。
7. FutureTask
的应用场景
7.1 异步计算任务
FutureTask
常用于执行耗时的计算任务,避免阻塞主线程。主线程可以在完成其他任务后,再通过 get()
方法获取计算结果。
7.2 缓存的使用
可以使用 FutureTask
来实现简单的缓存功能。当缓存中不存在指定值时,可以通过 FutureTask
来计算并缓存结果。
7.3 并发任务的控制
可以利用 FutureTask
的特性来实现对一组并发任务的控制和管理,例如等待所有任务完成或只等待其中一个任务完成。
7.4 异步 I/O 操作
FutureTask
可以用于异步 I/O 操作的处理,例如异步读写文件或网络请求,通过 FutureTask
获取 I/O 操作的结果。
8. FutureTask
的局限性
尽管 FutureTask
是一个强大的工具,但也存在一些局限性:
无法获取任务执行进度:
FutureTask
无法直接获取任务的执行进度,只能获取任务的执行结果。无法动态取消任务:一旦
FutureTask
进入运行状态,就无法再取消任务,只能等待任务执行完成。单次使用:每个
FutureTask
只能执行一次,如果需要再次执行,需要创建新的FutureTask
对象。
9. 最佳实践
9.1 使用 Callable
接口
建议使用 Callable
接口作为任务的参数,以便获取任务的执行结果。
9.2 谨慎处理任务的取消操作
确保任务在取消后能正确清理资源,避免资源泄漏。
9.3 避免在任务执行过程中阻塞主线程
尽量使用异步回调或其他并发手段来处理任务的结果,避免阻塞主线程。
9.4 合理利用线程池
合理利用线程池来执行 FutureTask
,避免创建过多线程导致资源浪费和性能下降。
10. 总结
FutureTask
是 Java 并发编程中的一个重要工具,它通过封装 Callable
和 Runnable
任务,提供了异步计算和结果获取的能力。通过合理使用 FutureTask
,可以提高程序的并发性能和响应速度。然而,也需要注意到 FutureTask
的局限性,并根据具体需求选择合适的并发工具。希望这篇文章能帮助你更好地理解和使用 FutureTask
,如果你有任何问题或需要进一步的帮助,请随时提问。
二、FutureTask深入理解
1 FutureTask介绍
FutureTask是一个可以取消异步任务的类。FutureTask对Future做的一个基本实现。可以调用方法区开始和取消一个任务。
一般是配合Callable去使用。
异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask。
可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果。
FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了Runnable, Future这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理。
2 FutureTask应用
大方向是FutureTask对任务的控制:
任务执行过程中状态的控制
任务执行完毕后,返回结果的获取
FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。
public static void main(String[] args) throws InterruptedException {
// 构建FutureTask,基于泛型执行返回结果类型
// 在有参构造中,声明Callable或者Runnable指定任务
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("任务开始执行……");
Thread.sleep(2000);
System.out.println("任务执行完毕……");
return "OK!";
});
// 构建线程池Executor
Service service = Executors.newFixedThreadPool(10);
// 线程池执行任务
service.execute(futureTask);
// futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法
// run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
// 如果希望任务可以反复被执行,需要去调用runAndReset方法
// futureTask.run();
// 对返回结果的获取,类似阻塞队列的poll方法
// 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
// try {
// String s = futureTask.get(3000, TimeUnit.MILLISECONDS);
// System.out.println("返回结果:" + s);
// } catch (Exception e) {
// System.out.println("异常返回:" + e.getMessage());
// e.printStackTrace();// }
// 对返回结果的获取,类似阻塞队列的take方法,死等结果
// try {
// String s = futureTask.get();
// System.out.println("任务结果:" + s);
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
// 对任务状态的控制
// System.out.println("任务结束了么?:" + futureTask.isDone());
// Thread.sleep(1000);
// System.out.println("任务结束了么?:" + futureTask.isDone());
// Thread.sleep(1000);
// System.out.println("任务结束了么?:" + futureTask.isDone());
}
3 FutureTask源码分析
看FutureTask的源码,要从几个方向去看:
先查看FutureTask中提供的一些状态
在查看任务的执行过程
3.1 FutureTask中的核心属性
清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。
/**
FutureTask的核心属性
FutureTask任务的状态流转
* NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED 任务被中断
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.2 FutureTask的run方法
任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理
// 当线程池执行FutureTask任务时,会调用的方法
public void run() {
// 如果当前任务状态不是NEW,直接return告辞
if (state != NEW ||
// 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程
// 如果CAS失败,直接return告辞
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
// 将要执行的任务拿到
Callable<V> c = callable;
// 健壮性判断,保证任务不是null
// 再次判断任务的状态是NEW(DCL)
if (c != null && state == NEW) {
// 执行任务
// result:任务的返回结果
// ran:如果为true,任务正常结束。 如果为false,任务异常结束。
V result;
boolean ran;
try {
// 执行任务
result = c.call();
// 正常结果,ran设置为true
ran = true;
} catch (Throwable ex) {
// 如果任务执行期间出了异常
// 返回结果置为null
result = null;
// ran设置为false
ran = false;
// 封装异常结果
setException(ex);
}
if (ran)
// 封装正常结果
set(result);
}
} finally {
// 将执行任务的线程置位null
runner = null;
// 拿到任务的状态
int s = state;
// 如果状态大于等于INTERRUPTING
if (s >= INTERRUPTING)
// 进来代表任务中断,做一些后续处理
handlePossibleCancellationInterrupt(s);
}
}
3.3 FutureTask的set&setException方法
任务执行完毕后,修改任务的状态以及封装任务的结果
// 没有异常的时候,正常返回结果
protected void set(V v) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将返回结果赋值给 outcome 属性
outcome = v;
// 将任务状态变为NORMAL,正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 一会再说……
finishCompletion();
}
}
// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常信息封装到 outcome 属性
outcome = t;
// 将任务状态变为EXCEPTIONAL,异常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
// 一会再说……
finishCompletion();
}
}
3.4 FutureTask的cancel方法
任务取消的一个方式
任务直接从NEW状态转换为CANCEL
任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {
// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
// 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果mayInterruptIfRunning为true
// 就需要中断线程
if (mayInterruptIfRunning) {
try {
// 拿到任务线程
Thread t = runner;
if (t != null)
// 如果线程不为null,直接interrupt
t.interrupt();
} finally {
// 将任务状态设置为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 任务结束后的一些处理~~ 一会看~~
finishCompletion();
}
return true;
}
3.5 FutureTask的get方法
这个是线程获取FutureTask任务执行结果的方法
// 拿任务结果
public V get() throws InterruptedException, ExecutionException {
// 获取任务的状态
int s = state;
// 要么是NEW,任务还没执行完
// 要么COMPLETING,任务执行完了,结果还没封装好。
if (s <= COMPLETING)
// 让当前线程阻塞,等待结果
s = awaitDone(false, 0L);
// 最终想要获取结果,需要执行report方法
return report(s);
}
// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 针对get方法传入了等待时长时,需要计算等到什么时间点final
long deadline = timed ? System.nanoTime() + nanos : 0L;
// 声明好需要的Node,queued:放到链表中了么?
WaitNode q = null;
boolean queued = false;
for (;;) {
// 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 拿到状态
int s = state;
// 到这,说明任务结束了。
if (s > COMPLETING) {
if (q != null)
// 如果之前封装了WaitNode,现在要清空
q.thread = null;
return s;
}
// 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了
else if (s == COMPLETING)
Thread.yield();
// 如果还没初始化WaitNode,初始化
else if (q == null)
q = new WaitNode();
// 没放队列的话,直接放到waiters的前面
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 准备挂起线程,如果timed为true,挂起一段时间e
lse if (timed) {
// 计算出最多可以等待多久
nanos = deadline - System.nanoTime();
// 如果等待的时间没了
if (nanos <= 0L) {
// 移除当前的Node,返回任务状态
removeWaiter(q);
return state;
}
// 等一会
LockSupport.parkNanos(this, nanos);
}
else
// 死等
LockSupport.park(this);
}
}
// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {
// 拿到outcome 返回结果
Object x = outcome;
// 如果任务状态是NORMAL,任务正常结束,返回结果
if (s == NORMAL)
return (V)x;
// 如果任务状态大于等于取消
if (s >= CANCELLED)
// 直接抛出异常
throw new CancellationException();
// 到这就是异常结束
throw new ExecutionException((Throwable)x);
}
3.6 FutureTask的finishCompletion方法
只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法
而这个方法其实就是唤醒那些执行get方法等待任务结果的线程
// 任务结束后触发
private void finishCompletion() {
// 在任务结束后,需要唤醒
for (WaitNode q; (q = waiters) != null;) {
// 第一步直接以CAS的方式将WaitNode置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 拿到了Node中的线程
Thread t = q.thread;
// 如果线程不为null
if (t != null) {
// 第一步先置为null
q.thread = null;
// 直接唤醒这个线程
LockSupport.unpark(t);
}
// 拿到当前Node的next
WaitNode next = q.next;
// next为null,代表已经将全部节点唤醒了吗,跳出循环
if (next == null)
break;
// 将next置位null
q.next = null;
// q的引用指向next
q = next;
}
break;
}
}
// 任务结束后,可以基于这个扩展方法,记录一些信息
done();
// 任务执行完,把callable具体任务置为null
callable = null;
}