天增的博客
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
  • Java并发工具包
  • 并发基础
    • 线程基础
      • Thread的状态
      • 进程与线程
      • 正确停止线程的方式
      • Thread的实现方式
      • waitnotifynotifyAll
      • 生产者消费者模型
    • 线程安全
      • 线程不安全
      • 线程安全
      • 需要注意线程安全问题的情况
  • 并发工具
    • 线程协作
      • Semaphore信号量
      • CountDownLatch详解
      • 使用CompletableFuture解决旅游平台问题
      • 使用CyclicBarrier解决团建问题
    • Future
      • Future主要功能
      • FutureTask源码分析
        • 核心属性
        • 构造函数
        • 核心方法:Run()
        • 核心方法:get()
        • 核心方法:awaitDone(boolean timed,long nanos)
        • 核心方法:Cancel()
    • ThreadLocal
      • ThreadLocal内存泄漏
      • ThreadLocal使用场景
    • 原子类
      • 原子类的作用概览
      • 原子类的性能分析
    • 阻塞队列
      • 常见的阻塞队列
      • 阻塞队列的常用方法
      • 什么是阻塞队列
    • 并发容器
      • HashMap
      • CopyOnWriteArrayList
      • ConcurrentHashMap详解
    • 线程池
      • 为什么多线程会带来性能问题
      • 线程池的优势
      • 创建线程池的参数
        • 如何设置线程数
      • 线程池线程复用原理
      • ForkJoin框架
    • 各种锁
      • 锁的种类和特点
        • 公平锁非公平锁
        • 自旋锁非自旋锁
        • 共享锁独占锁
        • 乐观锁和悲观锁
      • JVM锁优化
      • synchronized和Lock的对比
      • lock的常用方法
  • 底层原理
    • CAS原理
    • AQS框架
    • 伪共享
    • java内存模型
      • Java内存模型介绍
      • happens-before规则
  • topic
  • Java并发工具包
  • 并发工具
  • Future
  • FutureTask源码分析
2022-04-21
目录

FutureTask源码分析

# FutureTask源码分析

FutureTask为future提供了基础实现,而且也是我们用的最多的实现方式。

接下来,将会对FutureTask展开深入的分析。

# 核心属性

    /**
     * 任务的运行状态。
     * 可能的状态转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 运行callable的线程,运行后清零 */
    private Callable<V> callable;
    /** 从get()中返回的结果或者异常 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 运行callable的线程 */
    private volatile Thread runner;
    /** 使用Treiber保存等待线程 */
    private volatile WaitNode waiters;

其中我们需要注意的就是state类型,是volatile类型的,任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。

7种状态:

  • NEW: 表示是一个新的任务或者还没有被执行完的状态。初始状态
  • COMPLETING: 任务执行完,或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有被保存到outcome字段的时候,装填会从NEW->COMPLETING。这个状态时间很短,属于中间状态。
  • NORMAL:任务执行完,并且任务执行结果已经保存到outcome字段中,状态会从NEW -> COMPLETING -> NORMAL。最终态。
  • EXCEPTIONAL:任务执行发生异常,并且异常原因已经保存到outcome字段中,状态会从NEW -> COMPLETING -> EXCEPTIONAL。最终态。
  • CANCELLED:任务还没有开始执行或者已经开始执行但是还没有执行完,此时用户调用了cancel(false)方法,不中断任务执行的情况下取消线程执行。这个时候状态则是: NEW -> CANCELLED。最终态。
  • INTERRUPTING:任务还没有开始执行或者已经开始执行但是还没有执行完,此时用户调用了cancel(ture)方法,中断任务执行并且还没有执行中断操作之前。这个时候状态则是: NEW -> INTERRUPTING。这是一个中间状态。
  • INTERRUPTED:调用interrupt()中断任务执行线程会到这个状态。NEW -> INTERRUPTING -> INTERRUPTED。这是一个最终态。

值得注意的是,所有状态值大于COMPLETING的状态都是标识任务已经执行完成,无论是正常、异常、或者任务取消。

graph TD;
    NEW-->COMPLETING;
    COMPLETING-->NORMAL;
    COMPLETING-->EXCEPTIONAL;
    NEW-->CANCELLED;
    NEW-->INTERRUPTING-->INTERRUPTED;

# 构造函数

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以在构造函数中看到,初始化的状态是NEW,和我们上文中写的一致。

callable:用来保存底层调用。如果直接传入runable,会将Runnable对象包装成callable对象,如果任务执行成功就会返回传入的result。如果不需要返回值,可以传入一个null。

# 核心方法:Run()

public void run() {
  // 新建任务,CAS替换runner为当前线程。
  if (state != NEW ||
      !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    // 只有在初始状态的时候才进行执行
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        // 执行传入的操作
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        // 如果发生异常,设置异常信息
        // NEW -> COMPLETING -> EXCEPTIONAL
        setException(ex);
      }
      if (ran)
        // 如果正常执行完成,设置执行结果
        // NEW -> COMPLETING -> NORMAL
        set(result);
    }
  } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)。
      handlePossibleCancellationInterrupt(s);
  }
}

说明:

  • 运行任务,如果任务状态为NEW状态,则利用CAS修改为当前线程,执行完毕调用set(result)设置执行结果。

    /**
     * 1. 通过CAS操作设置STATE的状态,NEW -> COMPLETING
     * 2. outcome复制为v
     * 3. 设置STATE为NOMAL,COMPLETING -> NOMAL
     * 4. 执行完毕,唤醒等待的县城
     */
    protected void set(V v) {
            if (STATE.compareAndSet(this, NEW, COMPLETING)) {
                outcome = v;
                STATE.setRelease(this, NORMAL); // final state
                finishCompletion();
            }
        }
    
  • 结果设置完毕调用finishCompletion()唤醒等待县城

    
    private void finishCompletion() {
            // assert state > COMPLETING;
      			// 确保当前状态是结果态,才能进行唤醒操作
            for (WaitNode q; (q = waiters) != null;) {
                if (WAITERS.weakCompareAndSet(this, q, null)) {
                    for (;;) { // 自旋便等待线程
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                          	// 唤醒等待线程
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                      	// 将非等待的线程设置为null,帮助GC
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
      			// 任务完成后调用函数,自定义扩展
            done();
            callable = null;        // to reduce footprint
        }
    
  • 如果run的运行期间被中断,需要调用handlePossibleCancellationInterrupt来处理中断逻辑,确保任何中断(例如cancel(true))只停留在当前run或runAndReset的任务中

  • /**
     * Ensures that any interrupt from a possible cancel(true) is only
     * delivered to a task while in run or runAndReset.
     */
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        // 在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    
        // assert state == INTERRUPTED;
    
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }
    

# 核心方法:get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
}

说明: FutureTask通过get()方法获取任务执行结果。任务处于未完成的状态 state <= COMPLETEING,就调用awaitDown等待任务完成,通过report获取执行结果或者抛出异常。

report方法比较简单,如果当前状态是正常的NORMAL,则直接返回结果。

如果方法状态>=CANCELLED,也就是处于CANCELLED、INTERRUPTING、INTERRUPTED这三种状态的时候,抛出CancellationException,否则(状态为EXCEPTIONAL)就抛出业务异常

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

# 核心方法:awaitDone(boolean timed,long nanos)

这个方法就是get方法阻塞的关键所在


private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {// 自旋
            int s = state;
            // 如果状态处于完成态,置空等待节点的线程,帮助GC
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
              	// 如果还在等待任务的执行结果,就yield(),暂时让出时间片,让其他线程执行
                Thread.yield();
            else if (Thread.interrupted()) {
                // 如果线程被中断,则调用removeWaiter,移除节点避免堆积垃圾
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
              	// 如果等待时间为0 ,则意味着自旋一次就退出循环
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
              	// CAS修改waiter
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) {
                // 统计超时时间
                final long parkNanos;
              	// 首次循环,则初始化时间
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                  	// 后面的每次循环,都将记录运行时间
                    long elapsed = System.nanoTime() - startTime;
                    // 如果,超时了,就移除等待节点,返回节点状态回去
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                // 如果,状态还处于未完成状态,阻塞当前线程
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                // 不记录时间的话,阻塞当前线程
                LockSupport.park(this);
        }
    }
  • 如果当前状态为结束状态(state>COMPLETING),则根据需要置空等待节点的线程,并返回 Future 状态;
  • 如果当前状态为正在完成(COMPLETING),说明此时 Future 还不能做出超时动作,为任务让出CPU执行时间片;
  • 如果state为NEW,先新建一个WaitNode,然后CAS修改当前waiters;
  • 如果等待超时,则调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程;
  • 其他情况直接阻塞当前线程。

# 核心方法:Cancel()

public boolean cancel(boolean mayInterruptIfRunning) {
 //如果当前状态为NEW,根据参数修改状态为INTERRUPTING或CANCELLED
  if (!(state == NEW && STATE.compareAndSet
        (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
  try {    // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
      //可以在运行时中断
      try {
        Thread t = runner;
        if (t != null)
          t.interrupt();
      } finally { // final state
        STATE.setRelease(this, INTERRUPTED);
      }
    }
  } finally {
    finishCompletion();//移除并唤醒所有等待线程
  }
  return true;
}

说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。

  • 如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED。
  • 如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion移除并唤醒所有等待线程。
最近更新
01
以 root 身份启动 transmission-daemon
12-13
02
Debian系统安装qbittorrent-nox
12-09
03
LXC Debain12安装zerotier并实现局域网自动nat转发
07-29
更多文章>
Theme by Vdoing | Copyright © 2015-2024 天增 | 苏ICP备16037388号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式