首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

CompletableFuture使用详解(全网看这一篇就行)

  • 25-03-02 14:22
  • 4781
  • 10507
blog.csdn.net

CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

一、创建异步任务

1. supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

  1. // 带返回值异步请求,默认线程池
  2. public static CompletableFuture supplyAsync(Supplier supplier)
  3. // 带返回值的异步请求,可以自定义线程池
  4. public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  3. System.out.println("do something....");
  4. return "result";
  5. });
  6. //等待任务执行完成
  7. System.out.println("结果->" + cf.get());
  8. }
  9. public static void main(String[] args) throws ExecutionException, InterruptedException {
  10. // 自定义线程池
  11. ExecutorService executorService = Executors.newSingleThreadExecutor();
  12. CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  13. System.out.println("do something....");
  14. return "result";
  15. }, executorService);
  16. //等待子任务执行完成
  17. System.out.println("结果->" + cf.get());
  18. }

 测试结果:

 2. runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

  1. // 不带返回值的异步请求,默认线程池
  2. public static CompletableFuture runAsync(Runnable runnable)
  3. // 不带返回值的异步请求,可以自定义线程池
  4. public static CompletableFuture runAsync(Runnable runnable, Executor executor)

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
  3. System.out.println("do something....");
  4. });
  5. //等待任务执行完成
  6. System.out.println("结果->" + cf.get());
  7. }
  8. public static void main(String[] args) throws ExecutionException, InterruptedException {
  9. // 自定义线程池
  10. ExecutorService executorService = Executors.newSingleThreadExecutor();
  11. CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
  12. System.out.println("do something....");
  13. }, executorService);
  14. //等待任务执行完成
  15. System.out.println("结果->" + cf.get());
  16. }

测试结果:

3.获取任务结果的方法

  1. // 如果完成则返回结果,否则就抛出具体的异常
  2. public T get() throws InterruptedException, ExecutionException
  3. // 最大时间等待返回结果,否则就抛出具体异常
  4. public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
  5. // 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
  6. public T join()
  7. // 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
  8. public T getNow(T valueIfAbsent)
  9. // 如果任务没有完成,返回的值设置为给定值
  10. public boolean complete(T value)
  11. // 如果任务没有完成,就抛出给定异常
  12. public boolean completeExceptionally(Throwable ex)

 二、异步回调处理

1.thenApply和thenApplyAsync

 thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture cf2 = cf1.thenApplyAsync((result) -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. result += 2;
  9. return result;
  10. });
  11. //等待任务1执行完成
  12. System.out.println("cf1结果->" + cf1.get());
  13. //等待任务2执行完成
  14. System.out.println("cf2结果->" + cf2.get());
  15. }
  16. public static void main(String[] args) throws ExecutionException, InterruptedException {
  17. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  18. System.out.println(Thread.currentThread() + " cf1 do something....");
  19. return 1;
  20. });
  21. CompletableFuture cf2 = cf1.thenApply((result) -> {
  22. System.out.println(Thread.currentThread() + " cf2 do something....");
  23. result += 2;
  24. return result;
  25. });
  26. //等待任务1执行完成
  27. System.out.println("cf1结果->" + cf1.get());
  28. //等待任务2执行完成
  29. System.out.println("cf2结果->" + cf2.get());
  30. }

测试结果:

   

从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

2.thenAccept和thenAcceptAsync

 thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。

测试代码

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture cf2 = cf1.thenAccept((result) -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. });
  9. //等待任务1执行完成
  10. System.out.println("cf1结果->" + cf1.get());
  11. //等待任务2执行完成
  12. System.out.println("cf2结果->" + cf2.get());
  13. }
  14. public static void main(String[] args) throws ExecutionException, InterruptedException {
  15. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  16. System.out.println(Thread.currentThread() + " cf1 do something....");
  17. return 1;
  18. });
  19. CompletableFuture cf2 = cf1.thenAcceptAsync((result) -> {
  20. System.out.println(Thread.currentThread() + " cf2 do something....");
  21. });
  22. //等待任务1执行完成
  23. System.out.println("cf1结果->" + cf1.get());
  24. //等待任务2执行完成
  25. System.out.println("cf2结果->" + cf2.get());
  26. }

测试结果:

 从上面代码和测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

2.thenRun和thenRunAsync

 thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture cf2 = cf1.thenRun(() -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. });
  9. //等待任务1执行完成
  10. System.out.println("cf1结果->" + cf1.get());
  11. //等待任务2执行完成
  12. System.out.println("cf2结果->" + cf2.get());
  13. }
  14. public static void main(String[] args) throws ExecutionException, InterruptedException {
  15. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  16. System.out.println(Thread.currentThread() + " cf1 do something....");
  17. return 1;
  18. });
  19. CompletableFuture cf2 = cf1.thenRunAsync(() -> {
  20. System.out.println(Thread.currentThread() + " cf2 do something....");
  21. });
  22. //等待任务1执行完成
  23. System.out.println("cf1结果->" + cf1.get());
  24. //等待任务2执行完成
  25. System.out.println("cf2结果->" + cf2.get());
  26. }

 测试结果:

从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

3.whenComplete和whenCompleteAsync

 whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. int a = 1/0;
  5. return 1;
  6. });
  7. CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
  8. System.out.println("上个任务结果:" + result);
  9. System.out.println("上个任务抛出异常:" + e);
  10. System.out.println(Thread.currentThread() + " cf2 do something....");
  11. });
  12. // //等待任务1执行完成
  13. // System.out.println("cf1结果->" + cf1.get());
  14. // //等待任务2执行完成
  15. System.out.println("cf2结果->" + cf2.get());
  16. }

测试结果:

 

 whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

4.handle和handleAsync

 跟whenComplete基本一致,区别在于handle的回调方法有返回值。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. // int a = 1/0;
  5. return 1;
  6. });
  7. CompletableFuture cf2 = cf1.handle((result, e) -> {
  8. System.out.println(Thread.currentThread() + " cf2 do something....");
  9. System.out.println("上个任务结果:" + result);
  10. System.out.println("上个任务抛出异常:" + e);
  11. return result+2;
  12. });
  13. //等待任务2执行完成
  14. System.out.println("cf2结果->" + cf2.get());
  15. }

测试结果 :

三、多任务组合处理 

1.thenCombine、thenAcceptBoth 和runAfterBoth

这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。

区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println(Thread.currentThread() + " cf1 do something....");
  4. return 1;
  5. });
  6. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  7. System.out.println(Thread.currentThread() + " cf2 do something....");
  8. return 2;
  9. });
  10. CompletableFuture cf3 = cf1.thenCombine(cf2, (a, b) -> {
  11. System.out.println(Thread.currentThread() + " cf3 do something....");
  12. return a + b;
  13. });
  14. System.out.println("cf3结果->" + cf3.get());
  15. }
  16. public static void main(String[] args) throws ExecutionException, InterruptedException {
  17. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  18. System.out.println(Thread.currentThread() + " cf1 do something....");
  19. return 1;
  20. });
  21. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  22. System.out.println(Thread.currentThread() + " cf2 do something....");
  23. return 2;
  24. });
  25. CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
  26. System.out.println(Thread.currentThread() + " cf3 do something....");
  27. System.out.println(a + b);
  28. });
  29. System.out.println("cf3结果->" + cf3.get());
  30. }
  31. public static void main(String[] args) throws ExecutionException, InterruptedException {
  32. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  33. System.out.println(Thread.currentThread() + " cf1 do something....");
  34. return 1;
  35. });
  36. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  37. System.out.println(Thread.currentThread() + " cf2 do something....");
  38. return 2;
  39. });
  40. CompletableFuture cf3 = cf1.runAfterBoth(cf2, () -> {
  41. System.out.println(Thread.currentThread() + " cf3 do something....");
  42. });
  43. System.out.println("cf3结果->" + cf3.get());
  44. }

测试结果:

 

 2.applyToEither、acceptEither和runAfterEither

这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。

区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. try {
  4. System.out.println(Thread.currentThread() + " cf1 do something....");
  5. Thread.sleep(2000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. return "cf1 任务完成";
  10. });
  11. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  12. try {
  13. System.out.println(Thread.currentThread() + " cf2 do something....");
  14. Thread.sleep(5000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. return "cf2 任务完成";
  19. });
  20. CompletableFuture cf3 = cf1.applyToEither(cf2, (result) -> {
  21. System.out.println("接收到" + result);
  22. System.out.println(Thread.currentThread() + " cf3 do something....");
  23. return "cf3 任务完成";
  24. });
  25. System.out.println("cf3结果->" + cf3.get());
  26. }
  27. public static void main(String[] args) throws ExecutionException, InterruptedException {
  28. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  29. try {
  30. System.out.println(Thread.currentThread() + " cf1 do something....");
  31. Thread.sleep(2000);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. return "cf1 任务完成";
  36. });
  37. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  38. try {
  39. System.out.println(Thread.currentThread() + " cf2 do something....");
  40. Thread.sleep(5000);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. return "cf2 任务完成";
  45. });
  46. CompletableFuture cf3 = cf1.acceptEither(cf2, (result) -> {
  47. System.out.println("接收到" + result);
  48. System.out.println(Thread.currentThread() + " cf3 do something....");
  49. });
  50. System.out.println("cf3结果->" + cf3.get());
  51. }
  52. public static void main(String[] args) throws ExecutionException, InterruptedException {
  53. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  54. try {
  55. System.out.println(Thread.currentThread() + " cf1 do something....");
  56. Thread.sleep(2000);
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. System.out.println("cf1 任务完成");
  61. return "cf1 任务完成";
  62. });
  63. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  64. try {
  65. System.out.println(Thread.currentThread() + " cf2 do something....");
  66. Thread.sleep(5000);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. System.out.println("cf2 任务完成");
  71. return "cf2 任务完成";
  72. });
  73. CompletableFuture cf3 = cf1.runAfterEither(cf2, () -> {
  74. System.out.println(Thread.currentThread() + " cf3 do something....");
  75. System.out.println("cf3 任务完成");
  76. });
  77. System.out.println("cf3结果->" + cf3.get());
  78. }

测试结果: 

从上面可以看出cf1任务完成需要2秒,cf2任务完成需要5秒,使用applyToEither组合两个任务时,只要有其中一个任务完成时,就会执行cf3任务,显然cf1任务先完成了并且将自己任务的结果传值给了cf3任务,cf3任务中打印了接收到cf1任务完成,接着完成自己的任务,并返回cf3任务完成;acceptEither和runAfterEither类似,acceptEither会将cf1任务的结果作为cf3任务的入参,但cf3任务完成时并无返回值;runAfterEither不会将cf1任务的结果作为cf3任务的入参,它是没有任务入参,执行完自己的任务后也并无返回值。

3.allOf / anyOf 

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

测试代码:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  3. try {
  4. System.out.println(Thread.currentThread() + " cf1 do something....");
  5. Thread.sleep(2000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. System.out.println("cf1 任务完成");
  10. return "cf1 任务完成";
  11. });
  12. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  13. try {
  14. System.out.println(Thread.currentThread() + " cf2 do something....");
  15. int a = 1/0;
  16. Thread.sleep(5000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. System.out.println("cf2 任务完成");
  21. return "cf2 任务完成";
  22. });
  23. CompletableFuture cf3 = CompletableFuture.supplyAsync(() -> {
  24. try {
  25. System.out.println(Thread.currentThread() + " cf2 do something....");
  26. Thread.sleep(3000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. System.out.println("cf3 任务完成");
  31. return "cf3 任务完成";
  32. });
  33. CompletableFuture cfAll = CompletableFuture.allOf(cf1, cf2, cf3);
  34. System.out.println("cfAll结果->" + cfAll.get());
  35. }
  36. public static void main(String[] args) throws ExecutionException, InterruptedException {
  37. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  38. try {
  39. System.out.println(Thread.currentThread() + " cf1 do something....");
  40. Thread.sleep(2000);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. System.out.println("cf1 任务完成");
  45. return "cf1 任务完成";
  46. });
  47. CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
  48. try {
  49. System.out.println(Thread.currentThread() + " cf2 do something....");
  50. Thread.sleep(5000);
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. System.out.println("cf2 任务完成");
  55. return "cf2 任务完成";
  56. });
  57. CompletableFuture cf3 = CompletableFuture.supplyAsync(() -> {
  58. try {
  59. System.out.println(Thread.currentThread() + " cf2 do something....");
  60. Thread.sleep(3000);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. System.out.println("cf3 任务完成");
  65. return "cf3 任务完成";
  66. });
  67. CompletableFuture cfAll = CompletableFuture.anyOf(cf1, cf2, cf3);
  68. System.out.println("cfAll结果->" + cfAll.get());
  69. }
  70. 测试结果:

     

    文章知识点与官方知识档案匹配,可进一步学习相关知识
    Java技能树Java异步任务Future与CompletableFuture140178 人正在系统学习中
    注:本文转载自blog.csdn.net的代码搬运工阿新的文章"https://blog.csdn.net/zsx_xiaoxin/article/details/123898171"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
    复制链接
    复制链接
    相关推荐
    发表评论
    登录后才能发表评论和回复 注册

    / 登录

    评论记录:

    未查询到任何数据!
    回复评论:

    分类栏目

    后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

    热门文章

    101
    推荐
    关于我们 隐私政策 免责声明 联系我们
    Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
    Scroll to Top