第二十三章 Java多线程--异步编程-FutureTask

发布于:2024-12-07 ⋅ 阅读:(157) ⋅ 点赞:(0)

目录

一、FutureTask基本概念

1. 概述

2. 核心接口

2.1 Runnable

2.2 Callable

2.3 Future

2.4 RunnableFuture

3. FutureTask 的构造方法

4. FutureTask 的核心方法

4.1 run()

4.2 get()

4.3 cancel(boolean mayInterruptIfRunning)

5. 使用示例

5.1 使用 Callable 创建 FutureTask

5.2 使用 Runnable 创建 FutureTask

6. FutureTask 的内部实现

7. FutureTask 的应用场景

7.1 异步计算任务

7.2 缓存的使用

7.3 并发任务的控制

7.4 异步 I/O 操作

8. FutureTask 的局限性

9. 最佳实践

9.1 使用 Callable 接口

9.2 谨慎处理任务的取消操作

9.3 避免在任务执行过程中阻塞主线程

9.4 合理利用线程池

10. 总结

二、FutureTask深入理解

1 FutureTask介绍

2 FutureTask应用

3 FutureTask源码分析

3.1 FutureTask中的核心属性

3.2 FutureTask的run方法

3.3 FutureTask的set&setException方法

3.4 FutureTask的cancel方法

3.5 FutureTask的get方法

3.6 FutureTask的finishCompletion方法


一、FutureTask基本概念

1. 概述

FutureTask 是 Java 并发工具包 java.util.concurrent 中的一个重要组件,它实现了 RunnableFuture 接口,后者继承了 RunnableFuture 接口。这意味着 FutureTask 不仅可以作为一个任务被线程执行,还可以用来获取异步计算的结果。FutureTask 主要用于封装那些可以异步执行的任务,并且可以在任务完成后获取任务的执行结果。

2. 核心接口

2.1 Runnable

Runnable 是 Java 中最基本的多线程接口,定义了一个 run() 方法,该方法没有任何返回值,通常用于表示不需要返回结果的任务。

2.2 Callable

CallableRunnable 的增强版本,定义了一个 call() 方法,该方法可以返回一个结果,并且可以抛出异常。Callable 通常用于表示需要返回结果的任务。

2.3 Future

Future 接口代表异步计算的结果,提供了几种方法来检查任务是否完成、获取任务结果以及取消任务。主要方法包括:

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。

  • boolean isCancelled():判断任务是否被取消。

  • boolean isDone():判断任务是否已经完成。

  • V get():获取任务的结果,如果任务未完成,此方法会阻塞。

  • V get(long timeout, TimeUnit unit):在指定时间内获取任务的结果,如果任务未完成,此方法会阻塞一段时间后抛出 TimeoutException

2.4 RunnableFuture

RunnableFuture 接口同时继承了 RunnableFuture,表示一个可以作为任务执行并且可以获取结果的对象。

3. FutureTask 的构造方法

FutureTask 提供了两个主要的构造方法:

  • FutureTask(Callable<V> callable):接受一个 Callable 对象,用于创建一个可以返回结果的 FutureTask

  • FutureTask(Runnable runnable, V result):接受一个 Runnable 对象和一个结果值,用于创建一个可以返回指定结果的 FutureTask

4. FutureTask 的核心方法

4.1 run()

run() 方法用于启动任务的执行。如果 FutureTask 是通过 Callable 创建的,run() 方法会调用 Callablecall() 方法,并将结果保存起来;如果 FutureTask 是通过 Runnable 创建的,run() 方法会调用 Runnablerun() 方法,并将指定的结果值保存起来。

4.2 get()

get() 方法用于获取任务的执行结果。如果任务还未完成,调用 get() 方法会导致当前线程阻塞,直到任务完成。

4.3 cancel(boolean mayInterruptIfRunning)

cancel() 方法用于取消任务的执行。如果任务已经完成或已被取消,此方法无效。如果任务正在执行且 mayInterruptIfRunning 参数为 true,则会尝试中断执行任务的线程。

5. 使用示例

5.1 使用 Callable 创建 FutureTask

import java.util.concurrent.*;

public class FutureTaskExample {
    public static void main(String[] args) {
        // 创建一个 Callable 任务
        Callable<Integer> callable = () -> {
            System.out.println("任务开始执行...");
            Thread.sleep(2000);
            return 42;
        };

        // 创建 FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        // 创建线程并启动任务
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            // 获取任务结果
            Integer result = futureTask.get();
            System.out.println("任务执行结果: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

5.2 使用 Runnable 创建 FutureTask

import java.util.concurrent.*;

public class RunnableFutureTaskExample {
    public static void main(String[] args) {
        // 创建一个 Runnable 任务
        Runnable runnable = () -> {
            System.out.println("任务开始执行...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 创建 FutureTask,指定结果值
        FutureTask<String> futureTask = new FutureTask<>(runnable, "任务完成");

        // 创建线程并启动任务
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            // 获取任务结果
            String result = futureTask.get();
            System.out.println("任务执行结果: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

6. FutureTask 的内部实现

FutureTask 的内部实现依赖于 AbstractQueuedSynchronizer(AQS),这是一个用于构建锁和其他同步器的基础框架。FutureTask 内部维护了一个状态变量 state,用于记录任务的执行状态。任务的状态变化包括:

  • NEW:任务新建但未开始执行。

  • COMPLETING:任务正在完成。

  • NORMAL:任务正常完成。

  • EXCEPTIONAL:任务因异常而完成。

  • CANCELLED:任务被取消。

  • INTERRUPTING:任务正在被中断。

  • INTERRUPTED:任务已中断。

7. FutureTask 的应用场景

7.1 异步计算任务

FutureTask 常用于执行耗时的计算任务,避免阻塞主线程。主线程可以在完成其他任务后,再通过 get() 方法获取计算结果。

7.2 缓存的使用

可以使用 FutureTask 来实现简单的缓存功能。当缓存中不存在指定值时,可以通过 FutureTask 来计算并缓存结果。

7.3 并发任务的控制

可以利用 FutureTask 的特性来实现对一组并发任务的控制和管理,例如等待所有任务完成或只等待其中一个任务完成。

7.4 异步 I/O 操作

FutureTask 可以用于异步 I/O 操作的处理,例如异步读写文件或网络请求,通过 FutureTask 获取 I/O 操作的结果。

8. FutureTask 的局限性

尽管 FutureTask 是一个强大的工具,但也存在一些局限性:

  • 无法获取任务执行进度:FutureTask 无法直接获取任务的执行进度,只能获取任务的执行结果。

  • 无法动态取消任务:一旦 FutureTask 进入运行状态,就无法再取消任务,只能等待任务执行完成。

  • 单次使用:每个 FutureTask 只能执行一次,如果需要再次执行,需要创建新的 FutureTask 对象。

9. 最佳实践

9.1 使用 Callable 接口

建议使用 Callable 接口作为任务的参数,以便获取任务的执行结果。

9.2 谨慎处理任务的取消操作

确保任务在取消后能正确清理资源,避免资源泄漏。

9.3 避免在任务执行过程中阻塞主线程

尽量使用异步回调或其他并发手段来处理任务的结果,避免阻塞主线程。

9.4 合理利用线程池

合理利用线程池来执行 FutureTask,避免创建过多线程导致资源浪费和性能下降。

10. 总结

FutureTask 是 Java 并发编程中的一个重要工具,它通过封装 CallableRunnable 任务,提供了异步计算和结果获取的能力。通过合理使用 FutureTask,可以提高程序的并发性能和响应速度。然而,也需要注意到 FutureTask 的局限性,并根据具体需求选择合适的并发工具。希望这篇文章能帮助你更好地理解和使用 FutureTask,如果你有任何问题或需要进一步的帮助,请随时提问。

二、FutureTask深入理解

1 FutureTask介绍

FutureTask是一个可以取消异步任务的类。FutureTask对Future做的一个基本实现。可以调用方法区开始和取消一个任务。

一般是配合Callable去使用。

异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask。

可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果。

FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了Runnable, Future这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理。

2 FutureTask应用

大方向是FutureTask对任务的控制:

  • 任务执行过程中状态的控制

  • 任务执行完毕后,返回结果的获取

FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。

public static void main(String[] args) throws InterruptedException {
    // 构建FutureTask,基于泛型执行返回结果类型
    // 在有参构造中,声明Callable或者Runnable指定任务
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        System.out.println("任务开始执行……");
        Thread.sleep(2000);
        System.out.println("任务执行完毕……");
        return "OK!";
    });

    // 构建线程池Executor
    Service service = Executors.newFixedThreadPool(10);

    // 线程池执行任务
    service.execute(futureTask);

    // futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法
    // run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
    // 如果希望任务可以反复被执行,需要去调用runAndReset方法
    //        futureTask.run();
    // 对返回结果的获取,类似阻塞队列的poll方法
    // 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
    //        try {
    //            String s = futureTask.get(3000, TimeUnit.MILLISECONDS);
    //            System.out.println("返回结果:" + s);
    //        } catch (Exception e) {
    //            System.out.println("异常返回:" + e.getMessage());
    //            e.printStackTrace();//        }
    // 对返回结果的获取,类似阻塞队列的take方法,死等结果
    //        try {
    //            String s = futureTask.get();
    //            System.out.println("任务结果:" + s);
    //        } catch (ExecutionException e) {
    //            e.printStackTrace();
    //        }
    // 对任务状态的控制
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
    //        Thread.sleep(1000);
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
    //        Thread.sleep(1000);
    //        System.out.println("任务结束了么?:" + futureTask.isDone());
}

3 FutureTask源码分析

看FutureTask的源码,要从几个方向去看:

  • 先查看FutureTask中提供的一些状态

  • 在查看任务的执行过程

3.1 FutureTask中的核心属性

清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。

/**
 FutureTask的核心属性
 FutureTask任务的状态流转
 * NEW -> COMPLETING -> NORMAL           任务正常执行,并且返回结果也正常返回
 * NEW -> COMPLETING -> EXCEPTIONAL      任务正常执行,但是结果是异常
 * NEW -> CANCELLED                      任务被取消   
 * NEW -> INTERRUPTING -> INTERRUPTED    任务被中断
 */
 // 记录任务的状态
 private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

3.2 FutureTask的run方法

任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理

// 当线程池执行FutureTask任务时,会调用的方法
public void run() {
    // 如果当前任务状态不是NEW,直接return告辞
    if (state != NEW ||  
        // 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程
        // 如果CAS失败,直接return告辞
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;

    try {
        // 将要执行的任务拿到
        Callable<V> c = callable;
        // 健壮性判断,保证任务不是null
        // 再次判断任务的状态是NEW(DCL)
        if (c != null && state == NEW) {
            // 执行任务
            // result:任务的返回结果
            // ran:如果为true,任务正常结束。 如果为false,任务异常结束。
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                // 正常结果,ran设置为true
                ran = true;
            } catch (Throwable ex) {
                // 如果任务执行期间出了异常
                // 返回结果置为null
                result = null;
                // ran设置为false
                ran = false;
                // 封装异常结果
                setException(ex);
            }
            if (ran)
                // 封装正常结果
                set(result);
        }
    } finally {
        // 将执行任务的线程置位null
        runner = null;
        // 拿到任务的状态
        int s = state;
        // 如果状态大于等于INTERRUPTING
        if (s >= INTERRUPTING)
            // 进来代表任务中断,做一些后续处理
            handlePossibleCancellationInterrupt(s);
    }
}

3.3 FutureTask的set&setException方法

任务执行完毕后,修改任务的状态以及封装任务的结果

// 没有异常的时候,正常返回结果
protected void set(V v) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将返回结果赋值给 outcome 属性
        outcome = v;
        // 将任务状态变为NORMAL,正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        // 一会再说……
        finishCompletion();
    }
}

// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常信息封装到 outcome 属性
        outcome = t;
        // 将任务状态变为EXCEPTIONAL,异常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
        // 一会再说……
        finishCompletion();
    }
}

3.4 FutureTask的cancel方法

任务取消的一个方式

  • 任务直接从NEW状态转换为CANCEL

  • 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED

// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {
    // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
    // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
    if (!(state == NEW && 
        UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        // 如果mayInterruptIfRunning为true
        // 就需要中断线程
        if (mayInterruptIfRunning) {
            try {
                // 拿到任务线程
                Thread t = runner;
                if (t != null)
                    // 如果线程不为null,直接interrupt
                    t.interrupt();
            } finally { 
                // 将任务状态设置为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 任务结束后的一些处理~~ 一会看~~
        finishCompletion();
    }
    return true;
}

3.5 FutureTask的get方法

这个是线程获取FutureTask任务执行结果的方法

// 拿任务结果
public V get() throws InterruptedException, ExecutionException {
    // 获取任务的状态
    int s = state;
    // 要么是NEW,任务还没执行完
    // 要么COMPLETING,任务执行完了,结果还没封装好。
    if (s <= COMPLETING)
        // 让当前线程阻塞,等待结果
        s = awaitDone(false, 0L);
    // 最终想要获取结果,需要执行report方法
    return report(s);
}

// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // 针对get方法传入了等待时长时,需要计算等到什么时间点final 
    long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 声明好需要的Node,queued:放到链表中了么?
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 拿到状态
        int s = state;
        // 到这,说明任务结束了。
        if (s > COMPLETING) {
            if (q != null)
                // 如果之前封装了WaitNode,现在要清空
                q.thread = null;
            return s;
        }
        // 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了
        else if (s == COMPLETING) 
            Thread.yield();
        // 如果还没初始化WaitNode,初始化
        else if (q == null)
            q = new WaitNode();
        // 没放队列的话,直接放到waiters的前面
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 准备挂起线程,如果timed为true,挂起一段时间e
        lse if (timed) {
            // 计算出最多可以等待多久
            nanos = deadline - System.nanoTime();
            // 如果等待的时间没了
            if (nanos <= 0L) {
                // 移除当前的Node,返回任务状态
                removeWaiter(q);
                return state;
            }
            // 等一会
            LockSupport.parkNanos(this, nanos);
        }
        else
        // 死等
            LockSupport.park(this);
    }
}

// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {
    // 拿到outcome 返回结果
    Object x = outcome;
    // 如果任务状态是NORMAL,任务正常结束,返回结果
    if (s == NORMAL)
        return (V)x;
    // 如果任务状态大于等于取消
    if (s >= CANCELLED)
        // 直接抛出异常
        throw new CancellationException();
    // 到这就是异常结束
    throw new ExecutionException((Throwable)x);
}

3.6 FutureTask的finishCompletion方法

只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法

而这个方法其实就是唤醒那些执行get方法等待任务结果的线程

// 任务结束后触发
private void finishCompletion() {
    // 在任务结束后,需要唤醒
    for (WaitNode q; (q = waiters) != null;) {
        // 第一步直接以CAS的方式将WaitNode置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 拿到了Node中的线程
                Thread t = q.thread;
                // 如果线程不为null
                if (t != null) {
                    // 第一步先置为null
                    q.thread = null;
                    // 直接唤醒这个线程
                    LockSupport.unpark(t);
                }
                // 拿到当前Node的next
                WaitNode next = q.next;
                // next为null,代表已经将全部节点唤醒了吗,跳出循环
                if (next == null)
                    break;
                // 将next置位null
                q.next = null; 
                // q的引用指向next
                q = next;
            }
            break;
        }
    }

    // 任务结束后,可以基于这个扩展方法,记录一些信息
    done();

    // 任务执行完,把callable具体任务置为null
    callable = null;  
}


网站公告

今日签到

点亮在社区的每一天
去签到