首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

CompletableFuture回调机制的设计与实现

  • 25-03-02 12:01
  • 4749
  • 8696
blog.csdn.net

目录

一、Future原理总述与局限性分析

(一)Future实现原理回顾

(二)Future局限性分析与推荐(CompletableFuture)

二、CompletableFuture原理总述与回调机制总结

(一)CompletableFuture核心原理简述

(二)CompletableFuture回调机制的设计与实现简述

三、CompletableFuture回调机制设计与算法实现

(一)类图分析

(二)整体流程分析

(三)算法与实现

任务通知-postComplete

任务入栈-pushStack

可见性优化-lazySetNext

注册与完成回调任务

监听多个Future的执行结果-allOf与anyOf

四、开发中的指导建议

参考文献、书籍及链接


干货分享,感谢您的阅读!

备注:如果是想了解其使用介绍详见:CompletableFuture使用安利与源码分析

一、Future原理总述与局限性分析

(一)Future实现原理回顾

Java中的Future是一种异步编程的技术,它允许我们在另一个线程中执行任务,并在主线程中等待任务完成后获取结果。Future的实现原理可以通过Java中的两个接口来理解:Future和FutureTask。

Future接口是Java中用于表示异步操作结果的一个接口,它定义了获取异步操作结果的方法get(),并且可以通过isDone()方法查询操作是否已经完成。

在Java 5中引入了FutureTask类,它是一个实现了Future和Runnable接口的类,它可以将一个任务(Runnable或Callable)封装成一个异步操作,通过FutureTask的get()方法可以获取任务执行的结果。

在FutureTask的实现中,主要包括以下几个步骤:

  1. 创建一个FutureTask对象,并传入一个任务(Runnable或Callable)。
  2. 在另一个线程中执行任务,并将任务执行结果保存在FutureTask中。
  3. 在主线程中调用FutureTask的get()方法,如果任务还没有完成,则阻塞当前线程,直到任务完成并返回结果。

FutureTask的get()方法是一个阻塞方法,如果任务还没有完成,则会一直阻塞当前线程,直到任务完成。这个阻塞的过程可以通过一个volatile类型的变量来实现。在任务执行完成后,会调用done()方法通知FutureTask任务已经完成,并且设置执行结果。done()方法会调用FutureTask的回调函数,完成后将执行结果设置到FutureTask中。

需要注意的是,FutureTask并不能保证任务的执行顺序和执行结果,因为任务的执行是由线程池来控制的。如果需要保证任务的执行顺序和结果,可以使用CompletionService和ExecutorCompletionService。

综上所述,Future的实现原理就是通过Future和FutureTask接口,将任务封装成一个异步操作,并在主线程中等待任务完成后获取执行结果。FutureTask是Future的一个具体实现,通过阻塞方法和回调函数来实现异步操作的结果获取。

(二)Future局限性分析与推荐(CompletableFuture)

虽然Future在Java中提供了一种简单的异步编程技术,但它也存在一些局限性,包括以下几个方面:

  1. 阻塞问题:Future的get()方法是一个阻塞方法,如果任务没有完成,会一直阻塞当前线程,这会导致整个应用程序的响应性下降。
  2. 无法取消任务:Future的cancel()方法可以用于取消任务的执行,但如果任务已经开始执行,则无法取消。此时只能等待任务执行完毕,这会导致一定的性能损失。
  3. 缺少异常处理:Future的get()方法会抛出异常,但是如果任务执行过程中抛出异常,Future无法处理异常,只能将异常抛给调用者处理。
  4. 缺少组合操作:Future只能处理单个异步操作,无法支持多个操作的组合,例如需要等待多个任务全部完成后再执行下一步操作。

综上所述,Future虽然提供了一种简单的异步编程技术,但它的局限性也是比较明显的。在实际应用中,我们需要根据具体的业务需求和性能要求,选择合适的异步编程技术。例如,可以使用CompletableFuture来解决Future的一些问题,它可以避免阻塞、支持异常处理和组合操作等功能。

二、CompletableFuture原理总述与回调机制总结

(一)CompletableFuture核心原理简述

CompletableFuture是Java 8中引入的一个强大的异步编程工具,它允许我们以非阻塞的方式处理异步操作,并通过回调函数来处理异步操作完成后的结果。

CompletableFuture的核心原理是基于Java的Future接口和内部的状态机实现的。它可以通过三个步骤来实现异步操作:

  1.  创建CompletableFuture对象:通过CompletableFuture的静态工厂方法,我们可以创建一个新的CompletableFuture对象,并指定该对象的异步操作。通常情况下,我们可以通过supplyAsync()或者runAsync()方法来创建CompletableFuture对象。
  2.  异步操作的执行:在CompletableFuture对象创建之后,异步操作就开始执行了。这个异步操作可以是一个计算任务或者一个IO操作。CompletableFuture会在另一个线程中执行这个异步操作,这样主线程就不会被阻塞。
  3.  对异步操作的处理:异步操作执行完成后,CompletableFuture会根据执行结果修改其内部的状态,并触发相应的回调函数。如果异步操作成功完成,则会触发CompletableFuture的完成回调函数;如果异步操作抛出异常,则会触发CompletableFuture的异常回调函数。

CompletableFuture的优势在于它支持链式调用和组合操作。通过CompletableFuture的then系列方法,我们可以创建多个CompletableFuture对象,并将它们串联起来形成一个链式的操作流。在这个操作流中,每个CompletableFuture对象都可以依赖于之前的CompletableFuture对象,以实现更加复杂的异步操作。

总的来说,CompletableFuture的原理是基于Java的Future接口和内部的状态机实现的,它可以以非阻塞的方式执行异步操作,并通过回调函数来处理异步操作完成后的结果。通过链式调用和组合操作,CompletableFuture可以方便地实现复杂的异步编程任务。

(二)CompletableFuture回调机制的设计与实现简述

在CompletableFuture中,回调是一种重要的机制,可以在异步任务完成时自动触发回调函数。

CompletableFuture的回调机制可以分为两种类型:完成回调和异常回调。完成回调会在异步任务成功完成时触发,而异常回调会在异步任务抛出异常时触发。CompletableFuture为这两种回调提供了不同的方法,可以灵活地设置回调函数。

CompletableFuture的回调机制是通过Java的函数式编程实现的。在CompletableFuture中,回调函数是一个函数接口,例如CompletableFuture的thenAccept方法需要一个Consumer类型的回调函数作为参数。在异步任务完成后,CompletableFuture会自动调用回调函数并传递异步任务的结果。

在实现上,CompletableFuture的回调机制主要依赖于Java的Future接口和CompletableFuture内部的状态机。当CompletableFuture被创建时,它的状态是未完成的。当异步任务完成后,CompletableFuture会修改内部的状态,将结果或异常保存在内部,然后触发相应的回调函数。

当用户调用CompletableFuture的then系列方法时,CompletableFuture会返回一个新的CompletableFuture对象,表示一个新的异步任务。当原始的CompletableFuture完成时,它会自动触发新的CompletableFuture的回调函数。这种链式回调的设计可以方便地实现多个异步任务之间的依赖关系。

总的来说,CompletableFuture的回调机制是通过Java的函数式编程和状态机实现的。通过灵活的回调函数设置和链式调用,CompletableFuture可以方便地实现异步编程。

三、CompletableFuture回调机制设计与算法实现

从CompletableFuture的使用方法可以看出,CompletableFuture主要是通过回调的方式实现异步编程,解决Future在使用过程中需要阻塞的问题。

(一)类图分析

其结构与观察者模式类似,CompletableFuture是发布者,使用链表保存观察者Completion。

  • CompletableFuture的postComplete方法是通知方法,用于在CompletableFuture完成时通知观察者,发送订阅的数据。
  • Completion的tryFire方法用于处理CompletableFuture发布的结果。

若用户直接创建、注册和管理观察者,使用起来不够简洁。因此CompletableFuture对外提供接受lambda表达式的API。观察者Completion及其子类都被定义为内部类,封装用户传入的lambda表达式对应的函数式接口。

通过类图可以看出Completion是一个链状结构。链表的通知顺序有先进先出(FIFO)和后进先出(LIFO)两种。队列(FIFO)由于出队与入队修改不同变量,在竞争较多的情况下支持更高的吞吐量。栈(LIFO)与队列不同,入栈和出栈修改同一个变量,但在出栈时先前入栈的对象的内存很可能还保留在CPU缓存中,线程局部性更好,适合在竞争较少的情况下使用。CompletableFuture使用过程中一般不会频繁竞争,使用后进先出的通知顺序性能更优。

(二)整体流程分析

下图中,stack是CompletableFuture的任务栈的栈顶,Completion是栈的节点,它的dep字段保存了需要完成的CompletableFuture的引用。

整体流程如下:

  1. 创建:用户使用接受lambda表达式的API创建CompletableFuture或直接创建CompletableFuture并手动完成。
  2. 注册:用户使用接受lambda表达式的API注册任务,监听已创建的CompletableFuture的执行结果,返回新的CompletableFuture。注册的任务被封装成观察者Completion,Completion保存了返回的CompletableFuture的引用。Completion在监听的CompletableFuture未完成的情况下入栈。
  3. 通知:CompletableFuture完成后发布结果,通知栈中的Completion执行,Completion出栈。
  4. 完成:Completion执行后,完成引用的CompletableFuture。被完成的CompletableFuture通知其观察者。

下图中,stack是CompletableFuture的任务栈的栈顶,Completion是栈的节点,它的dep字段保存了需要完成的CompletableFuture的引用。

链式调用的情况下,通知和完成的简单实现是使用递归。递归需要大量运行空间,递归层次越深需要的内存越多,调用栈空间不足的时候会抛StackOverflowError。将递归转换成循环可以减少内存开销,避免StackOverflowError。对递归的转换可以通过合并多个CompletableFuture的stack来实现。

(三)算法与实现

CompletableFuture使用了名为Treiber Stack的一种用cas操作解决并发冲突,实现非阻塞无锁并发栈的算法。

任务通知-postComplete

postComplete实现了任务通知和任务栈合并的逻辑。postComplete使用NESTED参数执行tryFire告诉Completion可以返回要通知的CompletableFuture。

  1. final void postComplete() {
  2. CompletableFuture f = this; Completion h; // f保存的是当前要通知或合并的CompletableFuture
  3. while ((h = f.stack) != null || // f的栈不为空 => (通知未完成 || 合并未完成) => 继续循环
  4. (f != this // f != this => 在做合并操作;f的栈为空 && 在做合并操作 => f已被合并到this => 看this中还有没有任务可以通知
  5. && (h = (f = this).stack) != null)) {// this的栈中还有任务可以通知,继续循环
  6. CompletableFuture d; Completion t;
  7. if (f.casStack(h, t = h.next)) { // 出栈,原子操作:栈顶元素stack为h则将stack指向h的下一个元素t,否则返回false,读取最新的栈顶元素重试
  8. if (t != null) { // h不是f的栈中最后一个任务(最后一个直接执行,不用入栈)
  9. if (f != this) { // f != this => 需要做合并操作
  10. pushStack(h); // 将h压入this的栈
  11. continue;
  12. }
  13. h.next = null; // detach,帮助垃圾回收
  14. }
  15. f = (d = h.tryFire(NESTED)) == null ? this : d;// NESTED:嵌套模式,若tryFire返回的不是null,表示h有要通知的CF,接下来把要通知的CF的栈中任务压入this的栈
  16. }
  17. }
  18. }

任务入栈-pushStack

栈中节点Completion的实现如下,Completion的next字段用于保存前一个入栈的节点的引用,是一个volatile变量。tryFire方法可以接受SYNC、ASYNC和NESTED这三种模式。

  1. /* ------------- Base Completion classes and operations -------------- */
  2. ​
  3. @SuppressWarnings("serial")
  4. abstract static class Completion extends ForkJoinTask
  5. implements Runnable, AsynchronousCompletionTask {
  6. volatile Completion next; // Treiber stack link
  7. ​
  8. /**
  9. * Performs completion action if triggered, returning a
  10. * dependent that may need propagation, if one exists.
  11. *
  12. * @param mode SYNC, ASYNC, or NESTED
  13. */
  14. abstract CompletableFuture tryFire(int mode);
  15. ​
  16. /** Returns true if possibly still triggerable. Used by cleanStack. */
  17. abstract boolean isLive();
  18. ​
  19. public final void run() { tryFire(ASYNC); }
  20. public final boolean exec() { tryFire(ASYNC); return true; }
  21. public final Void getRawResult() { return null; }
  22. public final void setRawResult(Void v) {}
  23. }

栈顶stack也是volatile变量,可使用cas操作进行原子更新。出栈操作的实现是在循环中尝试执行casStack修改栈顶,直到修改成功为止。

  1. // stack在CompletableFuture对象中的offset
  2. private static final long STACK;
  3. final boolean casStack(Completion cmp, Completion val) {
  4. // 若stack为cmp,则更新成val,return true;否则,不更新,return false
  5. return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
  6. }

入栈操作的实现是在循环中尝试修改next字段和stack字段,直到stack字段修改成功为止。

  1. /** Returns true if successfully pushed c onto stack. */
  2. final boolean tryPushStack(Completion c) {
  3. Completion h = stack;
  4. //c.next=h,延迟写,不保证执行完之后c.next的值立马就对其他线程可见,节省保证可见性的开销
  5. lazySetNext(c, h);
  6. //如果成功,则stack修改后的值和c.next的修改后的值都保证对其他线程可见
  7. return UNSAFE.compareAndSwapObject(this, STACK, h, c);
  8. }
  9. ​
  10. /** Unconditionally pushes c onto stack, retrying if necessary. */
  11. final void pushStack(Completion c) {
  12. do {} while (!tryPushStack(c));
  13. }

push操作与Treiber Stack典型实现不同的点在于更新next的值时做了可见性优化。

从上图可以看到一次入栈操作需要修改next和stack这两个volatile变量,若stack修改失败,是不需要保证next字段的可见性的。只有stack修改成功才需要保证c.next的可见性,因此可以通过延迟写(lasySet)来减少一次volatile写操作。

可见性优化-lazySetNext

修改next与修改stack是组合使用的,所以修改stack时保证next与stack的可见性即可。putOrderedObject去掉了volatile变量写操作的可见性,仅保留其有序性用来处理next变量可以提高写next变量的性能。

  1. static void lazySetNext(Completion c, Completion next) {
  2. //next是volatile变量,保证有序性和可见性。保证写操作可见性需要将写缓冲(write buffer)刷新到内存,成本较高
  3. //putOrderedObject只保证有序性语义。
  4. //下一次写volatile时,写缓冲被刷新到内存,next写入的值保证可见。
  5. //典型使用场景是将非阻塞数据结构中的节点置空,让节点更快地回收
  6. UNSAFE.putOrderedObject(c, NEXT, next);
  7. }

注册与完成回调任务

以thenApply和thenCombine为例,讨论CompletableFuture如何在不加锁的前提下保证任务得到执行且只执行一次。

监听单个Future的执行结果-以thenApply为例

约束:CompletableFuture未完成的情况下任务才能入栈。

原因:若CompletableFuture回调完成后任务才入栈,入栈的任务得不到通知。

问题:可能需要对result进行加锁。

如果不对result进行加锁,在并发的情况下可能出现下图中的执行顺序。判断result是否为空时CompletableFuture还未完成,执行Completion入栈操作,但CompletableFuture通知后入栈操作才完成,任务得不到通知。

加锁情况下的执行顺序如下图所示,可以解决上述问题:

无锁解决方案:Completion在入栈后再尝试执行一次,若执行成功则将任务从栈中清除。

监听两个Future的执行结果-以thenCombine为例

假设CompletableFuture a在A线程执行,CompletableFuture b在B线程执行,thenCombine注册任务的逻辑如下:

从下图可以看到,若两个Future的result同时不为null,Completion注册之后可能同时被A线程和B线程通知,执行两次。

因此需要加一个变量标记Completion是否执行过。

监听多个Future的执行结果-allOf与anyOf

allOf与anyOf都是创建监听多个CompletableFuture的CompletableFuture,区别在于

  • allOf的完成条件是监听的所有Future都完成,无计算结果。
  • anyOf的完成条件是监听的任意一个Future完成,计算结果为最早完成的Future的计算结果。

四、开发中的指导建议

在实际开发中,使用CompletableFuture需要注意以下几个方面:

  1. 异常处理:在CompletableFuture中,可以使用exceptionally()方法或handle()方法来处理任务执行过程中的异常,避免异常导致整个应用程序崩溃。异常处理的方式可以根据具体的业务需求来选择。
  2. 避免阻塞:CompletableFuture提供了一系列非阻塞的方法,例如thenApplyAsync()、thenAcceptAsync()、thenRunAsync()等,可以在任务执行的过程中不阻塞当前线程,从而提高整个应用程序的响应性能。
  3. 组合操作:CompletableFuture支持多个操作的组合,可以使用thenCompose()方法和thenCombine()方法将多个异步操作组合在一起,形成一个任务链,从而避免任务之间的阻塞和顺序问题。
  4. 慎用join()方法:在CompletableFuture中,join()方法是一个阻塞方法,如果任务没有完成,则会一直阻塞当前线程。因此,在使用join()方法时需要慎重考虑,避免造成整个应用程序的阻塞。
  5. 合理使用线程池:CompletableFuture默认使用ForkJoinPool线程池执行异步任务,如果任务过多或者任务执行时间过长,可能会造成线程池耗尽和任务等待的问题。因此,在使用CompletableFuture时需要根据具体的业务需求和性能要求,选择合适的线程池,并对线程池进行调优和管理。

综上所述,使用CompletableFuture需要根据具体的业务需求和性能要求,合理选择异步编程技术,并注意异常处理、避免阻塞、合理组合操作、慎用join()方法和合理使用线程池等方面。

参考文献、书籍及链接

1.《深入解析Java并发编程:CompletableFuture源码分析》https://www.jianshu.com/p/02a4d4c4be4d:这篇文章通过源码分析,详细介绍了CompletableFuture的实现原理,包括异步执行、回调函数、异常处理等方面。

2.《从Java并发包看并发编程(六):CompletableFuture原理分析》https://www.cnblogs.com/zhenyulu/p/9267681.html:这篇文章通过代码分析,讲解了CompletableFuture的实现原理,并讨论了CompletableFuture的优缺点和适用场景。

3.《Java 8 Completable Future: Performing Async Tasks》https://www.javacodegeeks.com/2018/05/java-8-completable-future-performing-async-tasks.html:这篇文章通过源码分析,详细介绍了CompletableFuture的实现原理,包括线程池、任务执行流程、回调函数等方面。

4.《Java并发编程学习笔记之CompletableFuture源码解析》https://www.cnblogs.com/ll409546297/p/12015692.html:这篇文章通过源码分析,介绍了CompletableFuture的实现原理,包括任务状态、异步执行、回调函数、异常处理等方面。

5.《深入分析CompletableFuture源码》https://blog.csdn.net/liuganggang123/article/details/81153429:这篇文章通过源码分析,详细介绍了CompletableFuture的实现原理,包括异步执行、回调函数、异常处理等方面,还讨论了CompletableFuture的一些问题和注意事项。

6.王鹏《CompletableFuture回调机制的设计与实现》

7.CompletableFuture原理解析 | 老司机撩Java

8.图解CompletableFuture源码_weixin_38592881的博客-CSDN博客

9.深入解读CompletableFuture源码与原理_CoderBruis的博客-CSDN博客_future.isdown()

10.CompletableFuture(可完成的异步执行的任务)源码解析(史上最全分析 借鉴深入理解并发原理)_completablefuture源码分析_沈情的博客-CSDN博客

11.CompletableFuture源码解析_pngyul的博客-CSDN博客_tryfire

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树Java异步任务Future与CompletableFuture148946 人正在系统学习中
注:本文转载自blog.csdn.net的张彦峰ZYF的文章"https://zyfcodes.blog.csdn.net/article/details/130466437"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top