JAVA多线程FutureTask作用

发布于:2023-02-05 ⋅ 阅读:(672) ⋅ 点赞:(0)


1.什么是FutureTask


          FutureTask是一个可取消的异步运算的任务,FutureTask里面可以可以传入Callable和Runable实现类作为参数,可以对异步运算任务的结果进行等待获取,判断是否已经完成,取消任务等操作。理解FutureTask之前先要知道Future接口,

          Future接口属于包java.util.concurrent, 接口提供了判断任务是否完成、终端任务、获取任务执行结果三种能力,源码如下:

package java.util.concurrent;
public interface Future<V> {
    
    //用来取消任务
    boolean cancel(boolean mayInterruptIfRunning);

     //返回任务是否取消成功
    boolean isCancelled();
    
    // 返回任务是否执行完成
    boolean isDone();

    //用来获取执行结果,获取的时候产生阻塞,直到任务执行完成
    V get() throws InterruptedException, ExecutionException;

    //用来获取任务执行结果,获取的时候产生阻塞,指定时间内任务没有执行完则TimeoutException
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

        Future只是一个接口,我们经常使用的是java.util.concurrent.FutureTask, 它是Future接口的实现类  (FutureTask首先实现了RunnableFuture接口,RunnableFuture接口集成Future.并在Future接口的基础上增加了run()行为)

        因为接口Future#get()是阻塞方法,所以我们在使用FutureTask#get()获取线程处理结果的时,先会阻塞等待任务处理完成, 处理完成后才能拿到结果

public class FutureTask<V> implements RunnableFuture<V> {
    //...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

        一个Future对象可以调用Callable和Runable的对象进行包装,由于FutureTask也是Runnable接口的实现类,所以FutureTask也可以放入线程池中,比如查看Spring源码中的ThreadPoolTaskExecutor 对象

public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
		implements SchedulingTaskExecutor, Executor, BeanNameAware, InitializingBean, DisposableBean {
    
	public void execute(Runnable task) {
		Executor executor = getThreadPoolExecutor();
		try {
			executor.execute(task);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}
    

	public Future<?> submit(Runnable task) {
		FutureTask<Object> future = new FutureTask<Object>(task, null);
		execute(future);   //future 为参数 
		return future;
	}

	public <T> Future<T> submit(Callable<T> task) {
		FutureTask<T> future = new FutureTask<T>(task);
		execute(future);  //future 为参数 
		return future;
	}



}

        在项目中FutrueTask 广泛使用, 比如有一个任务处理起来比较慢,但是这个任务可以拆分成多个小任务分别处理,然后将结果合并起来返回, 

        举个例子:计算1*10+2*10+3*10+4*10+5*10求和,可以一次计算出来,也可以拆分成多个线程计算N*10, 每个线程处理后,在最后求和。如下代码所示

package com.thread.future;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class FutureTaskTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(5);
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
        for (int i = 1; i < 6; i++) {
            FutureTask<Integer> task = new FutureTask<Integer>(new Calculate(i));
            exec.execute(task);
            taskList.add(task);
        }

        Integer sum = 0;
        for (FutureTask<Integer> task : taskList) {
            //阻塞等待,当线程所有任务执行后才能达到每个线程处理结果
            Integer num = task.get();
            //输出处理结果
            System.out.println("" + new Date() + " " + sum + "+" + num + "=" + (sum + num));
            sum = sum + num;
        }
        exec.shutdown();
    }


}

class Calculate implements Callable {
    private int a;

    public Calculate(Integer a) {
        this.a = a;
    }

    public Integer call() throws Exception {
        System.out.println("" + new Date() + " " + Thread.currentThread().getName() + "deal :" + a);
        Thread.sleep(2000);
        return a * 10;  //这里只模拟简单运算
    }
}

执行结果

Fri Jul 29 19:17:03 CST 2022 pool-1-thread-5deal :5
Fri Jul 29 19:17:03 CST 2022 pool-1-thread-2deal :2
Fri Jul 29 19:17:03 CST 2022 pool-1-thread-1deal :1
Fri Jul 29 19:17:03 CST 2022 pool-1-thread-4deal :4
Fri Jul 29 19:17:03 CST 2022 pool-1-thread-3deal :3
Fri Jul 29 19:17:05 CST 2022 0+10=10
Fri Jul 29 19:17:05 CST 2022 10+20=30
Fri Jul 29 19:17:05 CST 2022 30+30=60
Fri Jul 29 19:17:05 CST 2022 60+40=100
Fri Jul 29 19:17:05 CST 2022 100+50=150

2. FutureTask源码分析


FutureTask中 state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性

 public class FutureTask<V> implements RunnableFuture<V> {
    // 表示当前任务的状态,volatile修饰
    private volatile int state;

    // 表示当前任务的状态是新创建的,尚未执行
    private static final int NEW          = 0;
    // 表示当前任务即将结束,还未完全结束,值还未写,一种临界状态
    private static final int COMPLETING   = 1;
    // 表示当前任务正常结束
    private static final int NORMAL       = 2;
    // 表示当前任务执行过程中出现了异常,内部封装的callable.call()向上抛出异常了
    private static final int EXCEPTIONAL  = 3;
    // 表示当前任务被取消
    private static final int CANCELLED    = 4;
    // 表示当前任务中断中
    private static final int INTERRUPTING = 5;
    // 表示当前任务已中断
    private static final int INTERRUPTED  = 6;


}

构造器:

FuturnTask提供了两个构造器,FutureTask里面可以可以传入Callable和Runable实现类作为参数

    /
    //上文说FutureTask里面可以可以传入Callable和Runable实现类作为参数  
    
    
    //一旦运行,执行给定的Callable
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        // 设置状态为新创建
        this.state = NEW;      
    }
    
    //一旦运行,执行给定的Runalbe,并在完成后通过get返回给调用者
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        // 设置状态为新创建
        this.state = NEW;      
    }

run方法:

public void run() {
        // 当前任务状态不为new或者runner旧值不为null,说明已经启动过了,直接返回,这里也说明了run()里面的具体逻辑只会
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        // 只有当任务状态为new并且runner旧值为null才会执行到这里
        try {
            // 传入的callable任务
            Callable<V> c = callable;

            // 当任务不为null并且当前任务状态为新建时才会往下执行
            if (c != null && state == NEW) {
                V result; // 储存任务的返回结果
                boolean ran;// 储存执行是否成功
                try {
                    // 调用callable.run()并返回结果
                    result = c.call();
                    ran = true;   // 正常执行设置ran为true
                } catch (Throwable ex) {
                    
                    result = null; // 异常时设置结果为null
                    ran = false;

                    //并且更新任务状态为EXCEPTIONAL(执行过程中出现了异常)并且唤醒阻塞的线程
                    setException(ex);
                }
  
                if (ran) //执行成功
                    // 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程
                    set(result);
            }
        } finally {
            // 将当前任务的线程设置为null
            runner = null;
            // 当前任务的状态
            int s = state;
            // 如果state>=INTERRUPTING,说明当前任务处于中断中或已中断状态
            if (s >= INTERRUPTING)
                // 如果当前任务处于中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态
                handlePossibleCancellationInterrupt(s);
        }
    }

上一篇:【JAVA多线程】线程变量ThreadLocal和InheritableThreadLocal

本文含有隐藏内容,请 开通VIP 后查看