首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

CompletableFuture源码解析

  • 25-03-02 13:44
  • 2713
  • 10850
blog.csdn.net

写在前面

刚开始是想着直接写 CompletableFuture 的使用,后来又想到 CompletableFuture 的基本使用,大家多多少少都会用。于是就研究了一下 CompletableFuture 源码,给大家带来一点不一样的学习体验。所以就有了第二个栏目和第四个栏目

由于 CompletableFuture 默认的线程池是 ForkJoinPool,在讲 CompletableFuture 之前觉得有必要先简单介绍一下 ForkJoinPool。

ForkJoinPool 工作原理

ForkJoin 框架,另一种风格的线程池(相比于 ThreadPoolExecutor),采用分治算法,以及工作窃取策略,极大地提高了并行性。对于那种大任务分割小任务的(分治)又或者并行计算场景尤其有用。

几个角色

ForkJoinPool

线程池,ForkJoinPool 类中有一个commonPool,默认并发数为逻辑处理器个数 - 1;

ForkJoinPool commonPool = ForkJoinPool.commonPool();//makeCommonPool

//看源码发现 
int parallelism = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
// 如果有自定义,那么取自定义的作为 parallelism
// 如果没有 逻辑处理器个数-1 
parallelism = Runtime.getRuntime().availableProcessors() - 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

当然了,如果你不想使用 commonPool,你完全可以直接new 一个

ForkJoinPool fjp = new ForkJoinPool(4);// 最大并发数4
  • 1

其实我当时有个疑问,明明可以直接 new 一个 ForkJoinPool,还可以很方便的指定parallelism(我写的ForkJoinTest就是通过这种方式),为什么 ForkJoinPool 中还定义一个static的commonPool?

commonPool一般用于 Java8 中的并发流计算中或者 CompletableFuture 没有指定线程池时使用的一个commonPool(下面会讲到)。

ForkJoinTask

ForkJoinPool 与 ForkJoinTask 之间的关系,可以类比 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。它是一个抽象类。主要有两个实现RecursiveAction(有返回值) 和 RecursiveTask(无返回值)

ForkJoinWorkerThread

运行 ForkJoinTask 任务的工作线程(worker),最大并发数不会超过parallelism

WorkQueue

任务队列,每个worker对应一个queue,这是和 ThreadPoolExecutor 最大不同的地方之一

WorkQueue[]

ForkJoinPool 中的任务分为两种,一种是本地提交的任务Submission task,通过execute()、submit()、invoke()等方法提交的任务;比如 CompletableFuture 中不提供线程池时,提交的都是Submission task

另外一种是工作线程fork出的子任务Worker task.

两种任务都会存放在WorkQueue数组中,Submission task存放在WorkQueue数组的偶数索引位置,Worker task存放在奇数索引位置。工作线程只会分配在奇数索引的工作队列。

请添加图片描述

工作窃取机制

工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在 ForkJoinPool 中,工作任务的队列都采用双端队列容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现 FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上 LIFO,而窃取其他线程的任务的时候,从队列头部取获取

请添加图片描述

工作线程worker1、worker2以及worker3都从WorkQueue的尾部popping获取task,而任务也从尾部Pushing,当worker3队列中没有任务的时候,就会从其他线程的队列中窃取stealing,这样就使得 worker3 不会由于没有任务而空闲。这就是工作窃取算法的基本原理。 可以想象,要是不使用工作窃取算法,那么我们在不断 fork 的过程中,可能某些 worker 就会一直处于 join 的等待中。

因为这种机制,它能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。

疑问:

  1. 为什么说 ForkJoinPool 并发访问一个 IO 计算可能会拖垮整个系统?

    我觉得这主要说的是 ForkJoinPool 中的 commonPool,commonPool 是整个·系统共享的。比如你在 Stream 并行流中并发访问一个 IO 计算,某一时刻会导致commonPool 中大部分线程甚至所有线程都阻碍在这里。这可能就会造成其他程序使用到 commonPool 的程序线程饥饿,比如 CompletableFuture 中没有指定线程池时。

  2. 为什么说 cpu 密集型的任务使用 ForkJoinPool 性能更好?

    我看网上说主要有工作窃取机制。工作窃取的目的不就是充分利用 cpu,那普通线程不也可以吗,把核心线程调成和 ForkJoinPool 默认线程。

我总结主要包括以下几个原因(听听就好):

  • ForkJoinPool 的工作线程都有属于自己的一个队列,类似于 ConcrurentHashMap,在 Java8 中,ConcrurentHashMap 对每个桶的锁隔离,每个桶的写入更新只影响该桶的锁,对其他桶没有影响。同理,ForkJoinPool 也是采取这种思想。ThreadPoolExecutor 所有线程共享一个队列,而 ForkJoinPool 每个 worker 都有与之绑定的 queue,大部分情况下没有竞争问题。尽管是窃取机制,但此时是重队列的另一端 pop。在一定程度上获取任务的速度远远比一般线程池要快。
  • 在并发计算中,一个任务分成的子任务。普通线程池没法保证子任务优先执行,导致子任务的父级任务一直阻塞。而 ForkJoinPool 可以,被 fork 出来的子任务会优先执行,充分利用 cpu 资源。
  • ThreadPoolExecutor 提交任务时,如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中,没有及时消费,没有充分你利用cpu资源

CompletableFuture 使用总结

关于 CompletableFuture(异步计算)的使用,说实话,我来万师傅之前在实际项目中没有应用过。来到这里,发现我们项目中很多地方都有使用到 CompletableFuture,比如user-site-api中订单列表的并发请求远程服务,user-order-open-api 服务中异步处理一些任务,又或者user-export-api中异步导出,并发处理数据等等

CompletableFuture 中的方法非常多,当时刚开始学习的时候,真的一脸懵。重载方法的使用场景如何选择,再加上对方法参数类型,比如 BiFunction,Consumer 等函数式接口也不是很熟悉。

所以…怎么使用还是不说了,简单介绍下completableFuture的命名规则,我觉得理解这些,也就会用了…

按功能分类

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。
  • 异步执行方法默认一个参数的话任务是在 ForkJoinPool.commonPool() 线程池中执行的(带看源码),带executor 参数的使用 executor线程池异步执行

按逻辑和组织方式来分的话(completableFuture 中大约有50个来方法)

  • 一种是 then 的逻辑,即前一个计算完成的时候调度后一个计算
  • 一种是 both 的逻辑,即等待两个计算都完成之后执行下一个计算,只要能组合一个和另一个,我们就可以无限复用这个 +1 的逻辑组合任意多的计算。(如果任务多,可以考虑可用allOf)
  • 另一种是 either 的逻辑,即等待两个计算的其中一个完成之后执行下一个计算。注意这样的计算可以说是非确定性的。因为被组合的两个计算中先触发下一个计算执行的那个会被作为前一个计算,而这两个前置的计算到底哪一个先完成是不可预知的(anyOf)

从依赖关系和出入参数类型区别,基本分为三类

  • apply 字样的方式意味着组合方式是 Function,即接受前一个计算的结果,应用函数之后返回一个新的结果
  • accept 字样的方式意味着组合方式是 Consumer,即接受前一个计算的结果,执行消费后不返回有意义的值
  • run 字样的方式意味着组合方式是 Runnable,即忽略前一个计算的结果,仅等待它完成后执行动作

其中出入参数主要有 Java8 Function,Consumer 或 Runnable三中函数型接口,每一种都决定了是怎么样一种依赖关系
请添加图片描述

CompletableFuture 源码解析

在讲源码之前,首先我先通过一个简单的例子,然后通过通俗的语言,来跟大家简单说一下,我对 CompletableFuture 实现的一个理解。

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "supply task1 first!")
    .thenApply(r -> "thenApply task2!");
System.out.println(completableFuture.get() + " finish!");
  • 1
  • 2
  • 3

实际上,也就是下面这样一个形式(方便讲解)

CompletableFuture<String> task1CompletableFuture = 
    CompletableFuture.supplyAsync(() -> "supply task1 first!"); //task1

CompletableFuture<String> task2CompletableFuture = 
    task1CompletableFuture.thenApply(r -> "thenApply task2!"); //task2
System.out.println(task2CompletableFuture.get() + " finish!");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

当时疑问就是:他是怎么实现在执行完task1之后再去执行task2的呢。

我的理解很简单,就是把task2封装成一个对象,假设我们把这个对象叫做 completion,然后注册到 task1CompletableFuture对象中,当task1CompletableFuture中的task1执行完成,判断task1CompletableFuture中的completion是否为空,不为空就执行中的对象completion中的task2,执行完成task2再唤醒主线程,获取结果。

基础设施

CompletableFuture 实现了 Future 接口和 CompletionStage 接口,CompletionStage接口提供了很多异步回调的函数。

创建 CompletableFuture

有两种方法可以创建 CompletableFuture:

  • 静态方法,比如supplyAsync。属于零输入,执行时机是马上执行。
  • 成员方法,比如 CompletableFuture 对象 thenApply。属于有输入,执行时机是调用对象的完成时机。
CompletableFuture 成员
    volatile Object result;       // 计算结果 或者已经包装的 AltResult
    volatile Completion stack;    // 依赖操作的堆栈顶部
  • 1
  • 2

CompletableFuture是在用户使用过程中唯一能直接接触到的对象。

  • result 存放执行结果,正常结果或者抛出的异常都要存放,所以是Object。任务执行完毕后,result会变成非null。
  • stack 是一个链栈,存放与this对象直接关联的Completion对象(也就是上文说到的completion对象)。Completion对象是用来驱动某一个CompletableFuture对象(比如上文 task2CompletableFuture),所谓的驱动,就是使得这个CompletableFuture对象的result成员变为非null。
Completion 内部类

Completion 对象是用户接触不到的,它用来驱动 CompletableFuture 对象。

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask 
  • 1
  • 它继承了ForkJoinTask,但也仅仅是为了套上 ForkJoinTask 的壳子,因为 CompletableFuture 默认的线程池是上面说到的ForkJoinPool.commonPool()。
  • 但它也实现了 Runnable,这使得它也能被一个普通线程正常执行。
  • Completion 有很多继承的子类(UniCompletion、BiCompletion 等等)它们分别实现了tryFire方法(可以理解是触发执行任务)。
	/* ------------- Base Completion classes and operations -------------- */
    abstract static class Completion extends ForkJoinTask<Void>
  • 1
  • 2
    /* ------------- Two-input Completions -------------- */
	//两个输入
    abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> 
  • 1
  • 2
  • 3
	/* ------------- One-input Completions -------------- */
	//一个输入
    abstract static class UniCompletion<T,V> extends Completion
  • 1
  • 2
  • 3
	//比如这次要讲的这个例子,就是用了UniApply
	//代表一个参数的Function的Completion
    static final class UniApply<T,V> extends UniCompletion<T,V>
  • 1
  • 2
  • 3
AltResult内部类
    static final class AltResult { // See above
        final Throwable ex;        // null only for NIL
        AltResult(Throwable x) { this.ex = x; }
    }

    static final AltResult NIL = new AltResult(null);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

前面提到,任务执行完毕后,result会变成非 null。但如果执行结果就是 null 该怎么办。所以用这个对象来包装一下null。

Signaller内部类
static final class Signaller extends Completion
        implements ForkJoinPool.ManagedBlocker {
        long nanos;                    // wait time if timed
        final long deadline;           // non-zero if timed
        volatile int interruptControl; // > 0: interruptible, < 0: interrupted
        volatile Thread thread;
        ...
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配合 get 或者 join 使用的,实现对 想获取执行结果的线程 的阻塞和唤醒的功能。

从supplyAsync + thenApply(thenApplyAsync)理解

接下来,还是以上面的例子,和大家一起看一下源码。

CompletableFuture 实现了 CompletionStage,代表一个执行阶段,我们可以在执行阶段之后添加后续任务,当前一个执行阶段完毕时,马上触发后续任务。

    public static void test() {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(supplyAsyncResult);
            return supplyAsyncResult;
        }).thenApply(r -> {  //添加后续任务
            String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";
            System.out.println(thenApplyResult);
            return thenApplyResult;
        });

        try {
            System.out.println(completableFuture.get() + " finish!");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

首先注意到这是一种链式编程,supplyAsync返回的是一个 CompletableFuture 对象(代表一个执行阶段),然后在这个 CompletableFuture 对象上再执行thenApply,又返回了一个新的 CompletableFuture 对象(代表下一个执行阶段)。而且发现,两个 task 都是在另外的线程里执行的,这完全实现了异步处理的效果。

为了方便称呼,我们叫第一个 task 为 前一个 stage,第二个 task 为 当前 stage。

本文也会把 CompletableFuture 对象称为一个 stage。

supplyAsync
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可见这个 CompletableFuture 对象是new出来以后就直接返回的,但是刚 new 的 CompletableFuture 对象的 result 成员是为 null,因为 task 还没有执行完。而 task 的执行交给了e.execute(new AsyncSupply(d, f))。

    static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;  //为了防止内存泄漏,方便GC.同时dep为null也是一种代表当前Completion对象的关联stage已完成的标志
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());  //执行task
                    } catch (Throwable ex) {       //执行task期间抛出了异常
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

很显然,为了能够e.execute,AsyncSupply 也必须是一个 Runnable 对象。执行e.execute(new AsyncSupply(d, f)),run 函数就会被另一个线程执行。当 task 被异步执行完毕后,会调用 completeValue 或 completeThrowable 来为 result 成员赋值。
请添加图片描述

上图体现了supplyAsync()的过程,对于调用者来说,只能接触到 stage 对象,并且调用者根本不知道 stage 对象何时能产生运行结果。对于实现来说,把 task 包装成一个 AsyncSupply 对象,另起线程执行 task,执行完毕后为 stage 对象赋值运行结果。

注意,stage 完成的标志,就是它的result成员非 null。

thenApply(thenApplyAsync)

在 supplyAsync 直接返回了个 CompletableFuture 对象后,主线程在这个对象上调用 thenApply 或 thenApplyAsync 将后续 stage 接续到前一个 stage 的后面。

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

thenApply不会传入 Executor,因为它优先让当前线程(例子中是 main 线程)来执行后续 stage 的 task。具体的说:

  • 当发现前一个 stage 已经执行完毕时,直接让当前线程来执行后续 stage 的 task。
  • 当发现前一还没执行完毕时,则把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中。执行前一个 stage 的线程,执行完毕后,接着执行后续 stage 的 task。
  • 总之,要么是一个异步线程走到底,要么让当前线程来执行后续 stage(因为异步线程已经结束,而且你又没有给 Executor,那只好让当前线程来执行咯)。

thenApplyAsync会传入一个 Executor,因为它总是让 Executor 线程池里面的线程 来执行后续 stage 的 task。具体的说:

  • 当发现前一个 stage 已经执行完毕时,直接让 Executor 来执行。
  • 当发现前一个 stage 还没执行完毕时,则等到执行前一个 stage 的线程执行完毕后,再让 Executor 来执行。
  • 总之,无论哪种情况,执行后一个 stage 的线程肯定不是当前添加后续 stage 的线程(例子中是 main 线程)了。
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    //如果e不为null,说明当前stage是无论如何都需要被异步执行的。所以短路后面的d.uniApply。
    //如果e为null,说明当前stage是可以允许被同步执行的。所以需要尝试一下d.uniApply。
    if (e != null || !d.uniApply(this, f, null)) {
    	//进入此分支有两种情况:
    	//1. 要么e不为null,前一个stage不一定执行完毕。就算前一个stage已经执行完毕,还可以用e来执行当前stage
    	//2. 要么e为null,但前一个stage还没执行完毕。所以只能入栈等待
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        //(考虑e为null)入栈后需要避免,入栈后刚好前一个stage已经执行完毕的情况。这种特殊情况,如果不执行c.tryFire(SYNC),当前stage永远不会完成。
        //(考虑e不为null)入栈后需要避免,入栈前 前一个stage已经执行完毕的情况。
        //下面这句,有可能发现前一个stage已经执行完毕,然后马上执行当前stage
        c.tryFire(SYNC);
    }
    return d;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 从CompletableFuture d = new CompletableFuture()和 return d 来看,还是和之前一样,new 出来一个CompletableFuture对象后就尽快返回。
  • 如果 Executor e 为 null(当前 stage 是可以允许被同步执行的),并且此时前一个 stage 已经结束了,这种情况应该让当前线程来同步执行当前 stage。但我们其实不知道前一个 stage 是否结束,所以通过d.uniApply(this, f, null)检测前一个 stage 是否已经结束。如果d.uniApply(this, f, null)返回 true,说明发现了前一个 stage 已经结束,并且当前线程执行完毕当前 stage,所以这种情况就会直接 return d。
    • d.uniApply(this, f, null)的第三个实参为 null,这代表与当前 stage 相关联的 Completion 对象还没有入栈(还没push©),即不可能有别的线程与当前线程来竞争执行当前 stage。这样d.uniApply(this, f, null)里面的逻辑就变简单了,要么发现前一个 stage 还没执行完,直接返回 false;要么发现前一个 stage 执行完毕,那么执行当前 stage 后,返回 true。

进入分支有两种情况:

  • 如果 e 不为 null:
    • 如果前一个 stage 已经执行完毕:当前线程在c.tryFire(SYNC)中把接管的当前 stage 转交给 e 执行。
    • 如果前一个 stage 还没执行完毕:当前线程会直接返回,等到执行前一个 stage 的线程来把当前 stage 转交给 e 执行。
  • 如果 e 为 null:
    • 并且前一个 stage 还没执行完毕。
  • 上面几种情况,最终都会入栈,不管 e 是否为 null,都有必要再尝试一下c.tryFire(SYNC),避免此时前一个 stage 已经完成的情况。
  • c.tryFire(SYNC)中也会执行类似d.uniApply(this, f, null),而且你会发现两种调用环境,uniApply 成员函数的 this 对象是一样的(当前 stage),第一个实参是一样的(前一个 stage),第二个实参也是同一个函数式接口对象,只有第三个实参不一样。
UniApply内部类#tryFire

在讲 tryFire 之前,我们先看看 tryFire 有几处调用:

  • uniApplyStage 中的同步调用,c.tryFire(SYNC)。
  • 执行前一个stage的线程,在 run 的d.postComplete()中,会调用tryFire(NESTED)。
  • 上面两处,tryFire 的 this 对象都是我们分析过程提到的当前 stage。并且,这说明 tryFire 可能会有多线程的竞争问题,来看看tryFire 是怎么解决的。
    • 多线程竞争,比如当前线程入栈后,执行前一个stage的线程刚完事,正要触发后续 stage(run的d.postComplete()中)。
	//src代表前一个stage, dep代表当前stage。  UniApply对象将两个stage组合在一起了。
    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            //1. 如果dep为null,说明当前stage已经被执行过了
            //2. 如果uniApply返回false,说明当前线程无法执行当前stage。返回false有可能是因为
            //     1. 前一个stage没执行完呢
            //     2. 前一个stage执行完了,但当前stage已经被别的线程执行了。如果提供了线程池,那么肯定属于被别的线程执行了。   
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            //执行到这里,说明dep不为null,而且uniApply返回true,说明当前线程执行了当前stage
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

看来这个竞争关系体现到了d.uniApply(a = src, fn, mode > 0 ? null : this),分析上面两种情况,发现mode > 0 ? null : this必然不成立,而this指的是 UniApply 对象(在CompletableFuture#uniApplyStage中创建的)。现在好了,上面两种情况第三个实参都是同一个 UniApply 对象(竞争处理的关键,之后讲),即两种情况对CompletableFuture#uniApply调用情况一模一样,竞争在这里面处理。

注意,(d = dep) == null已经起到了一定的防止竞争的作用,让线程提前返回。但也有可能起不到作用,因为两个线程刚好都执行到了d.uniApply(a = src, fn, mode > 0 ? null : this)。

CompletableFuture#uniApply

这个函数的逻辑之前讲过简单的一版,即当第三个实参为 null 时的情况。所以这里,重点关注第三个实参不为 null 的情况。

	//this永远是当前stage,a参数永远是前一个stage
    final <S> boolean uniApply(CompletableFuture<S> a,
                               Function<? super S,? extends T> f,
                               UniApply<S,T> c) {
        Object r; Throwable x;
        //前后两个条件只是优雅的避免空指针异常,实际不可能发生。
        //如果 前一个stage的result为null,说明前一个stage还没执行完毕
        if (a == null || (r = a.result) == null || f == null)
            return false;
        //执行到这里,说明前一个stage执行完毕

		//如果this即当前stage的result不为null,说当前stage还没执行。
        tryComplete: if (result == null) {  //一定程度防止了竞争
        	//如果前一个stage的执行结果为null或者抛出异常
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                	//如果前一个stage抛出异常,那么直接让当前stage的执行结果也为这个异常,都不用执行Function了
                    completeThrowable(x, r);
                    break tryComplete;
                }
                //如果前一个stage的执行结果为null
                r = null;//那么让r变成null
            }
            try {
            	//1. c为null,这说明c还没有入栈,没有线程竞争。直接执行当前stage即f.apply(s)
            	//2. c不为null,这说明c已经入栈了,有线程竞争执行当前stage。
                if (c != null && !c.claim())
                	//claim返回了false,说明当前线程不允许执行当前stage,直接返回
                    return false;
                    //claim返回了true,说明当前线程允许接下来执行当前stage
                @SuppressWarnings("unchecked") S s = (S) r;
                    completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        //如果this即当前stage的result不为null,说当前stage已经执行完毕,那么直接返回true
        return true;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • if ((x = ((AltResult)r).ex) != null)判断中,如果发现前一个 stage 执行完是因为抛出了异常,那么当前 stage 也不能正常执行了,直接把这个异常赋值给当前 stage 的 result 成员。
    • break tryComplete后,uniApply函数会马上返回 true,然后回到tryFire函数,紧接着马上执行dep = null(这代表当前 stage 已经执行完毕)。这样可以使得某种特殊时序下,执行前一个 stage 的线程不会通过tryFire函数中的(d = dep) == null的检查,进而直接返回null不去执行d.postFire(a, mode)。总之,这是为了避免对同一个 CompletableFuture 对象调用它的成员函数 postComplete。
    • 但貌似上面说的这种防止线程竞争的手段不是完全有效的,因为 “紧接着马上执行 dep = null ” 不是原子性,两个线程的执行顺序也有可能穿插执行,有可能当前线程还没来得及 “执行 dep = null ”,执行前一个stage的线程就开始执行tryFire函数了。
  • if (c != null && !c.claim())是用来保护函数式接口只被执行一次的。claim 函数返回 true 代表函数式接口接下来可以被当前线程同步执行。
        final boolean claim() {
            Executor e = executor;
            //如果该Completion包含有Executor,那么此函数每次都会返回false。
            //  CAS保证函数式接口只被提交一次给Executor
            //如果该Completion没有Executor,那么此函数第一次返回true,之后每次返回false。
            //  CAS保证函数式接口只被同步执行一次
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // disable
                e.execute(this);
            }
            return false;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 竞争关系,全部在claim()函数中处理掉。经过CAS的保护,这个函数式接口能够保证只被执行一次(可能是同步执行、或者提交给 Executor 异步执行)。
  • CompletableFuture#uniApply的返回值含义:
    • 返回 false,代表前一个 stage 还没完成。也可代表,当前 stage 的函数式接口已经被别的线程执行了。
    • 返回 true,代表前一个 stage 已经完成,并且当前 stage 的函数式接口被当前线程执行了。
    • 函数返回后,回到tryFire,将执行d.postFire(a, mode),因为执行完毕了当前 stage 的函数式接口,当前线程就得处理当前 stage 的后续任务。

梳理到这里,例子中的驱动关系就更加明白了:
请添加图片描述

谁执行了当前 stage,谁负责处理后续 stage

在tryFire函数中,如果d.uniApply(a = src, fn, mode > 0 ? null : this)返回了 true,说明当前线程执行了当前 stage 的函数式接口(这说明没有提供线程池,当前线程同步执行了 stage),自然接下来会去处理后续 stage(d.postFire(a, mode))。

在提供了线程池的情况下,且前一个stage没有抛出异常正常执行完的情况下,tryFire 函数中的d.uniApply(a = src, fn, mode > 0 ? null : this)必然会返回 false,因为它把 uniApply 对象提交给了线程池来执行。当前线程将不会去处理后续 stage了(d.postFire(a, mode))。

提交给线程池后,因为 uniApply 对象也是一个 Runnable 对象,它的 run 函数为:

        public final void run()                { tryFire(ASYNC); }
  • 1

线程池将提供一个线程来执行tryFire(ASYNC),之后d.uniApply(a = src, fn, mode > 0 ? null : this)将会直接执行当前 stage 的函数式接口,返回 true 后,再去处理后续 stage(d.postFire(a, mode))。

Async 任务的执行过程
  1. 执行前一个 stage 的线程来执行 UniApply 内部类对象的tryFire(NESTED)。
  2. 接着执行当前stage.uniApply()。
  3. 执行 UniApply 内部类对象的claim方法。
  4. 由于提供了线程池,claim会把任务提交给线程池(e.execute(this)),把保存的线程池清理掉(executor = null),然后返回 false。执行前一个 stage 的线程接着层层返回,最终tryFire(NESTED)返回 null。
  5. 线程池选择一个线程,开始对同一个 UniApply 内部类对象执行tryFire(ASYNC)。
  6. 接着执行当前stage.uniApply(a = src, fn, ASYNC > 0 ? null : this),第三个实参肯定为 null。ASYN C为 1.
  7. if (c != null && !c.claim())不会执行,因为第三个参数为 null。
  8. 这个线程以“同步”的方式来执行了任务。但它对于执行前一个 stage 的线程来说是异步的。
CompletableFuture#postFire

在前面的分析可知,UniApply#tryFire成功执行了这个 UniApply 对象的当前 stage 后,将会调用当前 stage 的postFire。

    final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    	//前一个stage的后续任务还没做完
        if (a != null && a.stack != null) {
            //1. mode为NESTED。说明就是postComplete调用过来的,那么只清理一下栈中无效节点即可。
            //2. mode为SYNC或ASYNC,但前一个stage还没执行完。不知道何时发生,因为调用postFire的前提就是前一个stage已经执行完
            if (mode < 0 || a.result == null)
                a.cleanStack();
            //3. mode为SYNC或ASYNC,但前一个stage已经执行完了。特殊时序可能发生的,那么帮忙完成前一个stage的的后续任务
            else
                a.postComplete();
        }
        //当前stage的后续任务还没做完
        if (result != null && stack != null) {
            if (mode < 0)//mode为NESTED。说明就是postComplete调用过来的.
                return this;
            else         //mode为SYNC或ASYNC,那么调用postComplete
                postComplete();
        }
        return null;
        
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
简单说下 allOf 原理

allOf 使用了一种递归分治的方法,虽然任务可能有很多个,但最终它们都能拆分为 1 个或者 2 个。从最底层两两合并,弄成一个树形结构,最终这个树形结构的根节点就是我们想要的 CompletableFuture,它们代表了所有任务都完成的时机。

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
  • 1
  • 2
  • 3

从CompletableFuture的返回类型可知,allOf方法返回的 CompletableFuture 对象只是代表一种时机,这个时机就是 cfs 里面的 CompletableFuture 都已经执行完毕。当我们在allOf方法返回的 CompletableFuture 对象上添加后续任务时,能保证后续任务是这个特殊时机后才会触发的。

总结

  • CompletableFuture 通过异步回调来处理执行结果,解决了 FutureTask 对执行结果获取的痛点。
  • 用户只能接触到 CompletableFuture 对象,靠 Completion 对象来驱动 CompletableFuture 对象。注意,Completion 是实现了 Runnable 的。
  • 个人认为,CompletableFuture 这种工具类我们会用就 ok,并且简单理解他的原理就好。说实话,我觉得研究 ThreadPoolExecutor 或者 ForkJoinPool 源码比深入 CompletableFuture 更有意义!
注:本文转载自blog.csdn.net的pngyul的文章"https://blog.csdn.net/PNGYUL/article/details/119838961"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top