目录
1.异步回调:thenApply和thenApplyAsync
2.异步回调:thenCompose和thenComposeAsync
3.异步回调:thenAccept和thenAcceptAsync
干货分享,感谢您的阅读!
随着现代软件系统的复杂性和用户需求的多样化,异步编程成为了提升系统性能和响应速度的重要手段。在Java领域,CompletableFuture作为Java 8引入的新特性,提供了强大的异步编程能力,极大地简化了多线程和并发任务的处理。本文将探讨CompletableFuture的基本功能和使用方法,介绍如何利用其提升程序的并发性能和代码的可维护性。
其原理相关内容见:CompletableFuture回调机制的设计与实现_张彦峰ZYF的博客-CSDN博客
一、CompletableFuture基本功能安利
CompletableFuture是JDK8中的新特性,主要用于对JDK5中加入的Future的补充,实现了CompletionStage和Future接口。其基本功能主要如下图,作为一项利器在我们的平时开发中频繁使用。
一般使用上我们主要集中在任务创建、异步回调、多任务组合、结果处理和结果获取,本文主要针对各部分主要功能进行基本的安利使用,同时附上一些源码的文章供分析。
二、CompletableFuture使用介绍
假设我们有一个敏感词系统,可以针对各种信息进行基本的敏感词验证,其核心主要包括三部分功能:文本清洗、敏感词验证、敏感词干预生效。
- 第一步,文本清洗针对验证的文本将直接去除其中的特殊符号、表情包、隐藏符号、中文简体繁体转换等内容,只保留文本中含有的中英文和数字信息。
- 第二步,清洗后,将清洗文本与实际相关的词库进行验证来查看是否命中相关敏感词,并给出词库命中的敏感词信息。
- 第三步,如果存在词库命中的敏感词,同时考虑各词是否有加白以及生效规则(只对某区域生效等),从而确定最终的命中信息。
以上是业务基背景,相关可以采用责任链管道模式实现可见责任链模式(以及变种管道模式)的应用案例_张彦峰ZYF的博客-CSDN博客_责任链模式应用场景责任链在实际开发中的应用还是比较多的,特别是在营销订购系统、审核流转换处理、任务流程处理系统等系统中,其实我们在开发中往往主要应用的主要无非是以下三个场景(起码以我的平时开发的角度来看):一是无需太关心责任链中各处理流的顺序的简单使用;二是需要关注处理顺序,按责任链条延续处理,每个处理节点均可对请求进行节点的处理, 或将其传递给链上的下个处理节点;三是在处理中和纯的责任链模式在链上只会有一个处理器用于处理业务数据存在差异,需要进行管道模式采用多个处理器都会处理业务数据。针对以上场景进行业务举例和代码书写http://iyenn.com/rec/1691449.html?spm=1001.2014.3001.5501其中含有基本业务流程图可方便理解。
(一)任务创建使用
1.supplyAsync创建带有返回值的异步任务
- public static CompletableFuture supplyAsync(Supplier supplier)
- public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
我们限定创建用于清洗用户文本有返回值的异步任务,采用以下两种方式进行展示其基本创建:
- 使用默认线程池(ForkJoinPool.commonPool())异步任务创建-有返回值的异步任务supplyAsync
- 自定义线程池异步任务创建-有返回值的异步任务supplyAsync
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 异步任务创建:有返回值的异步任务supplyAsync
- * 使用默认线程池 + 自定义线程池
- */
- @Test
- public void testSupplyAsync() throws ExecutionException, InterruptedException {
- log.info("异步任务创建-有返回值的异步任务supplyAsync:使用默认线程池(ForkJoinPool.commonPool())");
- CompletableFuture
contentCleanTaskByDefault = CompletableFuture.supplyAsync(() -> { - log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
- ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .contentAttr(ContentAttr.builder().build()).build());
-
- return contentCleanResContext.getCleanContent();
- });
- log.info("异步任务获取用户文本清洗结果:【{}】", contentCleanTaskByDefault.get());
-
- log.info("异步任务创建-有返回值的异步任务supplyAsync:自定义线程池");
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
contentCleanTaskByDefine = CompletableFuture.supplyAsync(() -> { - log.info("异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】");
- ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
- return contentCleanResContext.getCleanContent();
- }, threadPoolExecutor);
- log.info("异步任务获取用户文本清洗结果:【{}】", contentCleanTaskByDefine.get());
- }
- }
测试结果展示:
重点关注打印日志“异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】”以及各自结果“异步任务获取用户文本清洗结果:【中国张彦峰外卖】”,管道相关处理日志暂时忽略。
- 2023-01-22 10:47:48,349 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 64] - 异步任务创建-有返回值的异步任务supplyAsync:使用默认线程池(ForkJoinPool.commonPool())
- 2023-01-22 10:47:48,354 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 66] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
- 2023-01-22 10:47:48,356 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=null, source=null, sourceId=null, bizType=null))
- 2023-01-22 10:47:48,398 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 10:47:48,434 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 10:47:48,466 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 10:47:48,877 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=null, source=null, sourceId=null, bizType=null))
- 2023-01-22 10:47:48,878 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 75] - 异步任务获取用户文本清洗结果:【中国张彦峰外卖】
- 2023-01-22 10:47:48,878 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 77] - 异步任务创建-有返回值的异步任务supplyAsync:自定义线程池
- 2023-01-22 10:47:48,879 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 88] - 异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】
- 2023-01-22 10:47:48,880 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 10:47:48,882 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 10:47:48,882 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 96] - 异步任务获取用户文本清洗结果:【中国张彦峰外卖】
-
2.runAsync创建没有返回值的异步任务
- public static CompletableFuture
runAsync(Runnable runnable) - public static CompletableFuture
runAsync(Runnable runnable,Executor executor)
我们限定创建用于清洗用户文本无返回值的异步任务,采用以下两种方式进行展示其基本创建:
- 使用默认线程池(ForkJoinPool.commonPool())异步任务创建-无返回值的异步任务supplyAsync
- 自定义线程池异步任务创建-无返回值的异步任务supplyAsync
因为无返回值,所以一般在处理中,异步任务对用户文本清洗并刷新数据到指定缓存,并"异步通知进行消息转发,触达各消费端进行后续清理
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
- /**
- * 异步任务创建:没有返回值的异步任务runAsync
- * 使用默认线程池 + 自定义线程池
- */
- @Test
- public void testRunAsync() throws ExecutionException, InterruptedException {
- log.info("异步任务创建-有返回值的异步任务runAsync:使用默认线程池(ForkJoinPool.commonPool())");
- CompletableFuture
contentCleanTaskByDefault = CompletableFuture.runAsync(() -> { - log.info("异步任务对用户文本清洗并刷新数据");
- log.info("异步通知进行消息转发,触达各消费端进行后续清理");
- });
-
- log.info("异步任务创建-有返回值的异步任务runAsync:自定义线程池");
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
contentCleanTaskByDefine = CompletableFuture.runAsync(() -> { - log.info("异步任务对用户文本清洗并刷新数据");
- log.info("异步通知进行消息转发,触达各消费端进行后续清理");
- }, threadPoolExecutor);
- }
- }
测试结果展示:
- 2023-01-22 10:56:35,672 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 105] - 异步任务创建-有返回值的异步任务runAsync:使用默认线程池(ForkJoinPool.commonPool())
- 2023-01-22 10:56:35,676 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 111] - 异步任务创建-有返回值的异步任务runAsync:自定义线程池
- 2023-01-22 10:56:35,676 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 107] - 异步任务对用户文本清洗并刷新数据
- 2023-01-22 10:56:35,677 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 108] - 异步通知进行消息转发,触达各消费端进行后续清理
- 2023-01-22 10:56:35,679 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 122] - 异步任务对用户文本清洗并刷新数据
- 2023-01-22 10:56:35,680 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 123] - 异步通知进行消息转发,触达各消费端进行后续清理
重点关注打印日志“异步任务获取用户文本清洗结果:用户文本【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰】”以及各自结果“异步通知进行消息转发,触达各消费端进行后续清理”,管道相关处理日志暂时忽略。
(二)异步回调使用
1.异步回调:thenApply和thenApplyAsync
- public CompletableFuture thenApply(Functionsuper T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Functionsuper T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Functionsuper T,? extends U> fn,Executor executor)
我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是有返回值的:
- 异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况,采用自定义线程池
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 异步回调:thenApply和thenApplyAsync
- * thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。
- *
- * 使用thenApply方法时子任务与父任务使用的是同一个线程
- * thenApplyAsync在子任务中是另起一个线程执行任务,可以自定义线程池
- */
- @Test
- public void testThenApply() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
- CompletableFuture
sensitveHitRes1 = contentCleanRes.thenApply((contentCleanResInfo) -> - sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo));
- log.info("异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
- log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
- sensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
sensitveHitRes2 = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()), threadPoolExecutor)
- .thenApplyAsync((contentCleanResInfo) ->
- sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo), threadPoolExecutor);
- log.info("异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
- log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
- sensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- }
- }
测试结果展示:
重点关注打印日志“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”以及“任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 外卖, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。
- 2023-01-22 11:02:55,076 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 144] - 异步回调-thenApply使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:02:55,079 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:02:55,122 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 38 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:02:55,156 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 31 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:02:55,188 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:02:55,565 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:02:55,566 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:02:55,566 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 145] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:02:55,580 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:02:55,581 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 146] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
- 2023-01-22 11:02:55,583 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:02:55,583 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 166] - 异步回调-thenApplyAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:02:55,583 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 167] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:02:55,585 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:02:55,586 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:02:55,586 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:02:55,587 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 168] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
2.异步回调:thenCompose和thenComposeAsync
- public CompletableFuture thenCompose(Functionsuper T, ? extends CompletionStage> fn)
- public CompletableFuture thenComposeAsync(Functionsuper T, ? extends CompletionStage> fn)
- public CompletableFuture thenComposeAsync(Functionsuper T, ? extends CompletionStage> fn, Executor executor)
我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是有返回值的:
- 异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 异步回调:thenCompose和thenComposeAsync
- * thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。
- *
- * thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture
- * thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的CompletableFuture调用中进行运算,是生成一个新的CompletableFuture。
- */
- @Test
- public void testThenCompose() throws ExecutionException, InterruptedException {
- CompletableFuture
sensitveHitRes1 = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build())).thenCompose(new Function
>() { - @Override
- public CompletionStage
apply(ContentCleanResContext contentCleanResInfo) { - return CompletableFuture.supplyAsync(new Supplier
() { - @Override
- public SensitveHitContext get() {
- return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
- }
- });
- }
- });
- log.info("异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- log.info("【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【{}】",
- sensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
-
- CompletableFuture
sensitveHitRes2 = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()))
- .thenComposeAsync(new Function
>() { - @Override
- public CompletionStage
apply(ContentCleanResContext contentCleanResInfo) { - return CompletableFuture.supplyAsync(new Supplier
() { - @Override
- public SensitveHitContext get() {
- return sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
- }
- });
- }
- });
- log.info("异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- log.info("【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【{}】",
- sensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- }
- }
测试结果展示:
重点关注打印日志“【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 外卖, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。
- 2023-01-22 11:06:38,047 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 197] - 异步回调-thenCompose使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:06:38,050 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:06:38,138 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 68 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:06:38,196 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 54 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:06:38,254 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 56 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:06:38,774 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:06:38,776 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:06:38,787 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:06:38,789 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 198] - 【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 腾讯, 酒精, 南京]】
- 2023-01-22 11:06:38,790 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:06:38,792 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 218] - 异步回调-thenComposeAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:06:38,794 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:06:38,797 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:06:38,798 [ForkJoinPool.commonPool-worker-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:06:38,800 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 219] - 【中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精】命中敏感词情况结果展示【[张彦峰, 腾讯, 酒精, 南京]】
3.异步回调:thenAccept和thenAcceptAsync
- public CompletableFuture
thenAccept(Consumersuper T> action) - public CompletableFuture
thenAcceptAsync(Consumersuper T> action) - public CompletableFuture
thenAcceptAsync(Consumersuper T> action, Executor executor)
我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是没有返回值的:
- 异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况,使用自定义线程池
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 异步回调:thenAccept和thenAcceptAsync
- * 函数式接口Consumer,这个接口只有输入,没有返回值。
- *
- * thenAccep方法时子任务与父任务使用的是同一个线程
- * henAccepAsync在子任务中可能是另起一个线程执行任务
- *
- * @throws ExecutionException
- * @throws InterruptedException
- */
- @Test
- public void testThenAccept() throws ExecutionException, InterruptedException {
- log.info("异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
-
- CompletableFuture
sensitveHitRes = contentCleanRes.thenAccept((contentCleanResInfo) -> { - SensitveHitContext sensitveHitContext = sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
- log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
- sensitveHitContext.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- });
-
- log.info("异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况");
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
sensitveHitRes2 = CompletableFuture.supplyAsync(() -> { - ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanResContext.getCleanContent());
- return contentCleanResContext;
-
- }, threadPoolExecutor)
- .thenAcceptAsync((contentCleanResInfo) -> {
- SensitveHitContext sensitveHitContext = sensitivePipelineExecutor.getSensitveHitRes(contentCleanResInfo);
- log.info("任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【{}】",
- sensitveHitContext.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- }, threadPoolExecutor);
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”和“任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】”,管道相关处理日志暂时忽略。
- 2023-01-22 11:10:42,946 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 235] - 异步回调-thenAccept使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:10:42,953 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:10:43,001 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 41 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:10:43,038 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 36 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:10:43,073 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 34 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:10:43,500 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:10:43,501 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 242] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:10:43,502 [main] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:10:43,515 [main] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:10:43,516 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 246] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
- 2023-01-22 11:10:43,516 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 250] - 异步回调-thenAcceptAsync使用,任务一将用户文本进行清洗,任务二回调清洗结果查看数据数据命中敏感词情况
- 2023-01-22 11:10:43,518 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:10:43,519 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:10:43,520 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 266] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:10:43,520 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:10:43,521 [thread-processor-2] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗结果构建上下文】, context=ContentCleanResContext(isCleanDone=true, content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5), reason=null)
- 2023-01-22 11:10:43,521 [thread-processor-2] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 272] - 任务二:回调清洗结果查看数据数据命中敏感词情况,命中结果展示【[张彦峰, 腾讯, 酒精, 南京]】
4.异步回调:thenRun和thenRunAsync
- public CompletableFuture
thenRun(Runnable action) - public CompletableFuture
thenRunAsync(Runnable action) - public CompletableFuture
thenRunAsync(Runnable action, Executor executor)
我们限定两个任务来完成用户文本的清洗和词库命中情况,采用以下两种方式进行展示,注意这两个方法是没有入参也没有返回值的:
- 异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
- 异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 异步回调:thenRun和thenRunAsync
- * thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
- * thenRun会在上一阶段 CompletableFuture计算完成的时候执行一个Runnable,而Runnable并不使用该CompletableFuture计算的结果。
- *
- * thenRun方法时子任务与父任务使用的是同一个线程
- * thenRunAsync在子任务中可能是另起一个线程执行任务
- *
- * @throws ExecutionException
- * @throws InterruptedException
- */
- @Test
- public void testThenRun() throws ExecutionException, InterruptedException {
- log.info("异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作");
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanRes.get().getCleanContent());
- CompletableFuture
notifyRes1 = contentCleanRes.thenRun(() -> { - log.info("任务二:通知相关消费者告知已清洗完毕,可执行后续操作!");
- });
-
- log.info("异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作");
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
notifyRes2 = CompletableFuture.supplyAsync(() -> { - ContentCleanResContext contentCleanResContext = sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build());
- log.info("任务一:将用户文本进行清洗,清洗结果展示【{}】", contentCleanResContext.getCleanContent());
- return contentCleanResContext;
-
- }
- , threadPoolExecutor)
- .thenRunAsync(() -> {
- log.info("任务二:通知相关消费者告知已清洗完毕,可执行后续操作!");
- }, threadPoolExecutor);
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】”和“任务二:通知相关消费者告知已清洗完毕,可执行后续操作!”,管道相关处理日志暂时忽略。
- 2023-01-22 11:15:06,816 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 290] - 异步回调-thenRun使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
- 2023-01-22 11:15:06,829 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:15:06,885 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 50 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:15:06,953 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 66 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:15:07,035 [ForkJoinPool.commonPool-worker-9] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 80 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:15:07,780 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:15:07,780 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 297] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:15:07,781 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 299] - 任务二:通知相关消费者告知已清洗完毕,可执行后续操作!
- 2023-01-22 11:15:07,781 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 302] - 异步回调-thenRunAsync使用,任务一将用户文本进行清洗,任务二进行异步通知清洗完成执行后续操作
- 2023-01-22 11:15:07,783 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:15:07,786 [thread-processor-1] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:15:07,787 [thread-processor-1] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 318] - 任务一:将用户文本进行清洗,清洗结果展示【中国张彦峰外卖腾讯南京酒精】
- 2023-01-22 11:15:07,788 [thread-processor-2] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 324] - 任务二:通知相关消费者告知已清洗完毕,可执行后续操作!
(三)多任务组合使用
1.多任务组合:thenCombine
- public CompletableFuture
thenCombine(CompletionStage other, BiFunctionsuper T,? super U,? extends V> fn) - public CompletableFuture
thenCombineAsync(CompletionStage other, BiFunctionsuper T,? super U,? extends V> fn) - public CompletableFuture
thenCombineAsync(CompletionStage other, BiFunctionsuper T,? super U,? extends V> fn, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖,该方法有具体返回值。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:thenCombine
- * 合并两个线程任务的结果,并进一步处理,该方法有返回值。
- */
- @Test
- public void testThenCombine() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result = complianceSensitveHitRes.thenCombineAsync(regularSensitveHitRes, (res1, res2) -> { - if (CollectionUtils.isEmpty(res1.getHitWords()) || CollectionUtils.isEmpty(res2.getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- return "当前商品可以售卖";
- }
- List
hitWords = Lists.newArrayList(); - hitWords.addAll(res1.getHitWords());
- hitWords.addAll(res2.getHitWords());
- return "当前商品不可以售卖,商品信息中包含敏感词" + hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList());
- });
- log.info("组合任务一和二结论:{}", result.get());
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】”,以及整合结论“组合任务一和二结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰, 酒精, 南京]”,管道相关处理日志暂时忽略。
- 2023-01-22 11:30:49,847 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:30:49,893 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 45 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:30:49,931 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:30:49,967 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 35 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:30:50,401 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:30:50,404 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 357] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
- 2023-01-22 11:30:50,415 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 366] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
- 2023-01-22 11:30:50,416 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 378] - 组合任务一和二结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰, 酒精, 南京]
2.多任务组合:thenAcceptBoth
- public CompletableFuture
thenAcceptBoth(CompletionStage other, BiConsumersuper T, ? super U> action) - public CompletableFuture
thenAcceptBothAsync(CompletionStage other, BiConsumersuper T, ? super U> action) - public CompletableFuture
thenAcceptBothAsync(CompletionStage other, BiConsumersuper T, ? super U> action, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖异步处理商品上架,否则对商品直接下架处理并告知下架理由为存在敏感词,该方法没有返回值,相关操作建议进行异步操作处理。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:thenAcceptBoth
- * 无返回值
- * 当两个CompletionStage都正常完成计算的时候,就会执行提供的action消费两个异步的结果
- */
- @Test
- public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result = complianceSensitveHitRes.thenAcceptBoth(regularSensitveHitRes, (res1, res2) -> { - if (CollectionUtils.isEmpty(res1.getHitWords()) || CollectionUtils.isEmpty(res2.getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- log.info("组合任务一和二结论异步处理商品上架");
- return;
- }
- List
hitWords = Lists.newArrayList(); - hitWords.addAll(res1.getHitWords());
- hitWords.addAll(res2.getHitWords());
- log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- });
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】”,以及整合结论“组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰, 酒精, 南京]】”,管道相关处理日志暂时忽略。
- 2023-01-22 11:35:13,204 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:35:13,253 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 49 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:35:13,307 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 51 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:35:13,362 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 53 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:35:14,232 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:35:14,236 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 411] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
- 2023-01-22 11:35:14,260 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 420] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
- 2023-01-22 11:35:14,261 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 431] - 组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰, 酒精, 南京]】
3.多任务组合:runAfterBoth
- public CompletableFuture
runAfterBoth(CompletionStage other, Runnable action) - public CompletableFuture
runAfterBothAsync(CompletionStage other, Runnable action) - public CompletableFuture
runAfterBothAsync(CompletionStage other, Runnable action, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖异步处理商品上架,否则对商品直接下架处理并告知下架理由为存在敏感词,该方法没有入参也没有返回值,相关操作建议进行异步保存和异步操作处理。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:runAfterBoth
- * 没有入参,也没有返回值
- * 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
- */
- @Test
- public void testRunAfterBoth() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】,结果暂存redis",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】,结果暂存redis",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result = complianceSensitveHitRes.runAfterBoth(regularSensitveHitRes, () -> { - log.info("组合任务一和二从缓存中获取暂存数据,以下为模拟");
- if (CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords()) ||
- CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- log.info("组合任务一和二结论异步处理商品上架");
- return;
- }
- List
hitWords = Lists.newArrayList(); - hitWords.addAll(SensitveHitContext.builder().build().getHitWords());
- hitWords.addAll(SensitveHitContext.builder().build().getHitWords());
- log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", hitWords.stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- });
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】,结果暂存redis”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,结果暂存redis”,以及整合结论“组合任务一和二结论异步处理商品上架”,管道相关处理日志暂时忽略。
- 2023-01-22 11:41:12,483 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:41:12,520 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:41:12,551 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 30 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:41:12,584 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:41:13,210 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国张彦峰外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:41:13,218 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 465] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】,结果暂存redis
- 2023-01-22 11:41:13,241 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 474] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,结果暂存redis
- 2023-01-22 11:41:13,243 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 477] - 组合任务一和二从缓存中获取暂存数据,以下为模拟
- 2023-01-22 11:41:13,244 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 481] - 组合任务一和二结论异步处理商品上架
4.多任务组合:applyToEither
- public CompletableFuture applyToEither(CompletionStage other, Functionsuper T, U> fn)
- public CompletableFuture applyToEitherAsync(CompletionStage other, Functionsuper T, U> fn)
- public CompletableFuture applyToEitherAsync(CompletionStage other, Functionsuper T, U> fn, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式有返回值。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:applyToEither
- * 该方法有返回值
- * 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
- */
- @Test
- public void testApplyToEither() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result = complianceSensitveHitRes.applyToEither(regularSensitveHitRes, res -> { - if (CollectionUtils.isEmpty(res.getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- return "当前商品可以售卖";
- }
- return "当前商品不可以售卖,商品信息中包含敏感词" + res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList());
- });
- log.info("组合任务一和二处理结论:{}", result.get());
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”,以及整合结论“组合任务一和二处理结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰]”,管道相关处理日志暂时忽略。
- 2023-01-22 11:46:18,735 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:46:18,803 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 68 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:46:18,865 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 58 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:46:18,919 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 52 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:46:19,668 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:46:19,671 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 521] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
- 2023-01-22 11:46:19,682 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 530] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]
- 2023-01-22 11:46:19,682 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 539] - 组合任务一和二处理结论:当前商品不可以售卖,商品信息中包含敏感词[张彦峰]
5.多任务组合:acceptEither
- public CompletableFuture
acceptEither(CompletionStage other, Consumersuper T> action) - public CompletableFuture
acceptEitherAsync(CompletionStage other, Consumersuper T> action) - public CompletableFuture
acceptEitherAsync(CompletionStage other, Consumersuper T> action, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式没有返回值,处理结果需要异步操作。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:acceptEither
- * 将已经完成任务的执行结果作为方法入参,但是无返回值
- * 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
- */
- @Test
- public void testAcceptEither() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result = complianceSensitveHitRes.acceptEither(regularSensitveHitRes, (res) -> { - if (CollectionUtils.isEmpty(res.getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- log.info("组合任务一和二结论异步处理商品上架");
- return;
- }
- log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", res.getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- });
- }
- }
测试结果展示:
重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”,以及整合结论“组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰]】”,管道相关处理日志暂时忽略。
- 2023-01-22 11:50:15,466 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:50:15,510 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 44 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:50:15,548 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:50:15,593 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 44 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:50:15,977 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:50:15,980 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 572] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
- 2023-01-22 11:50:15,990 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 581] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]
- 2023-01-22 11:50:15,991 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 589] - 组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【[张彦峰]】
6.多任务组合:runAfterEither
- public CompletableFuture
runAfterEither(CompletionStage other, Runnable action) - public CompletableFuture
runAfterEitherAsync(CompletionStage other, Runnable action) - public CompletableFuture
runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor)
我们限定用户文本的清洗完成清洗,然后异步查看企业合规管控处理查看数据数据命中敏感词情况和正则校验处理查看数据数据命中敏感词情况,只要有一个词库反馈不命中就认为商品可以售卖, 该方式下我们只关心最先返回的命中结果就进行后续操作,一般要求业务两边的校验处理结果是保持一致的,本处只做模拟使用,注意该方式没有入参没有返回值,处理结果需要异步操作。
使用举例代码:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
-
- /**
- * 多任务组合:runAfterEither
- * 没有入参,也没有返回值
- * 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
- */
- @Test
- public void testRunAfterEither() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .cleanContent("中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- CompletableFuture
complianceSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】,将结果存入redis中",
- complianceSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】,将结果存入redis中",
- regularSensitveHitRes.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
future = complianceSensitveHitRes.runAfterEither(regularSensitveHitRes, () -> { - log.info("组合任务一和二从缓存中获取暂存数据,以下为模拟:直接按指定内容获取当前缓存中的结果");
- if (CollectionUtils.isEmpty(SensitveHitContext.builder().build().getHitWords())) {
- /*只要有一个词库反馈不命中就认为商品可以售卖*/
- log.info("组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架");
- return;
- }
- log.info("组合任务一和二结论异步处理商品下架,并告知下架理由为存在敏感词【{}】", "張彥峰");
- });
- }
- }
测试结果展示:
重点关注打印日志“content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中”和“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中”,以及整合结论“组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架”,管道相关处理日志暂时忽略。
- 2023-01-22 11:54:33,692 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:54:33,772 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 80 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 11:54:33,820 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 47 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 11:54:33,858 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 37 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 11:54:34,226 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰, cleanContent=中国外卖, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 11:54:34,228 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 623] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中
- 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 632] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[]】,将结果存入redis中
- 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 635] - 组合任务一和二从缓存中获取暂存数据,以下为模拟:直接按指定内容获取当前缓存中的结果
- 2023-01-22 11:54:34,237 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 638] - 组合任务一和二结论,从缓存中获取结果不存在敏感词直接异步处理商品上架
7.多任务组合:allOf
public static CompletableFuture allOf(CompletableFuture... cfs)
我们限定用户文本的清洗完成清洗,然后异步查看词库情况,按一下两种情况进行分析
- 情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture,给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果
- 情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因
- 情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。
具体代码如下,各自验证结果分开验证:
- /**
- * @author yanfengzhang
- * @description CompletableFuture使用
- * @date 2022/12/29 23:45
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = ZYFApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- @Slf4j
- public class CompletableFutureTest {
-
- @Autowired
- private SensitivePipelineExecutor sensitivePipelineExecutor;
-
- /**
- * 多任务组合:allOf
- *
- * 实现多 CompletableFuture 的同时返回
- * CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
- */
- @Test
- public void testAllOf() throws ExecutionException, InterruptedException {
- CompletableFuture
contentCleanRes = CompletableFuture.supplyAsync(() -> - sensitivePipelineExecutor.getContentCleanRes(
- ContentInfoContext.builder()
- .content("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .cleanContent("中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精")
- .contentAttr(ContentAttr.builder().bizType(BizType.E_COMMERCE.getType()).cityCode(110010).build()).build()));
-
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,
- 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
- private AtomicInteger threadCount = new AtomicInteger(1);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
- }
- });
- log.info("情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture," +
- "给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果");
- CompletableFuture
complianceSensitveHitRes1 = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes1 = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.REGULAR).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
thesaurusSensitveHitRes1 = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.THESAURUS).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
- thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result1 = CompletableFuture.allOf(complianceSensitveHitRes1, regularSensitveHitRes1, thesaurusSensitveHitRes1); - log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
- complianceSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- regularSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- thesaurusSensitveHitRes1.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
-
- log.info("情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因。");
- CompletableFuture
complianceSensitveHitRes2 = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.COMPLIANCE).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes2 = CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("正则词库管控处理查看数据数据命中敏感词情况异常暂停");
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- regularSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
thesaurusSensitveHitRes2 = CompletableFuture.supplyAsync(() -> { - try {
- return (SensitveHitContext) PipelineRouteConfig.getInstance(SensitiveCons.Validate.THESAURUS).handle(contentCleanRes.get());
- } catch (Exception e) {
- return SensitveHitContext.builder().hitWords(Lists.newArrayList()).build();
- }
- }, threadPoolExecutor);
- log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
- thesaurusSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result2 = CompletableFuture.allOf(complianceSensitveHitRes2, regularSensitveHitRes2, thesaurusSensitveHitRes2); - log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
- complianceSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- regularSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- thesaurusSensitveHitRes2.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
-
- log.info("情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。");
- CompletableFuture
complianceSensitveHitRes3 = CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("企业合规管控处理查看数据数据命中敏感词情况异常暂停");
- }, threadPoolExecutor);
- log.info("任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- complianceSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
regularSensitveHitRes3 = CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("正则合规管控处理查看数据数据命中敏感词情况异常暂停");
- }, threadPoolExecutor);
- log.info("任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【{}】",
- regularSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
thesaurusSensitveHitRes3 = CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("业务自身词库处理查看数据数据命中敏感词情况异常暂停");
- }, threadPoolExecutor);
- log.info("任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【{}】",
- thesaurusSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- CompletableFuture
result3 = CompletableFuture.allOf(complianceSensitveHitRes3, regularSensitveHitRes3, thesaurusSensitveHitRes3); - log.info("多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【{}】,正则敏感词库【{}】,业务自身词库【{}】",
- complianceSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- regularSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()),
- thesaurusSensitveHitRes3.get().getHitWords().stream().map(SensitiveWord::getSensitive).collect(Collectors.toList()));
- }
- }
测试结果展示:
情况一验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”、“任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]”和“任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】”,以及整合结论“多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】”,管道相关处理日志暂时忽略。
- 2023-01-22 12:04:07,169 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 669] - 情况一:当给定的多个异步任务都正常完成后,返回一个新的 CompletableFuture,给定 CompletableFuture 的结果不会反映在返回的 CompletableFuture 中,但可以通过单独检查给定任务来获得结果
- 2023-01-22 12:04:07,171 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 12:04:07,200 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 29 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 12:04:07,228 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 27 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 12:04:07,255 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 26 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 12:04:07,610 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 12:04:07,613 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 678] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
- 2023-01-22 12:04:07,623 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 687] - 任务二:正则校验处理查看数据数据命中敏感词情况,命中结果展示【[酒精, 南京]】
- 2023-01-22 12:04:07,624 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 696] - 任务三:根据相关业务配置进行相关词库校验匹配查看数据数据命中敏感词情况,命中结果展示【[腾讯]】
- 2023-01-22 12:04:07,625 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 699] - 多任务组合:allOf执行完毕,相关词库敏感词命中情况展示:企业合规管控词库【[张彦峰]】,正则敏感词库【[酒精, 南京]】,业务自身词库【[腾讯]】
情况二验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】”、“java.util.concurrent.ExecutionException: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停”,管道相关处理日志暂时忽略。
- 2023-01-22 12:05:22,159 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 704] - 情况二:当任何一个异步任务异常完成,则返回的CompletableFuture 也会异常完成,并且将该异步任务的异常作为其原因。
- 2023-01-22 12:05:22,162 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 12:05:22,198 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 36 ms to scan 1 urls, producing 4 keys and 18 values
- 2023-01-22 12:05:22,232 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 32 ms to scan 1 urls, producing 4 keys and 13 values
- 2023-01-22 12:05:22,259 [thread-processor-1] INFO org.reflections.Reflections [Reflections.java : 232] - Reflections took 26 ms to scan 1 urls, producing 3 keys and 9 values
- 2023-01-22 12:05:22,634 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonTailHandler [CommonTailHandler.java : 22] - 管道执行完毕:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中国张彦峰外卖腾讯南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
- 2023-01-22 12:05:22,637 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 712] - 任务一:企业合规管控处理查看数据数据命中敏感词情况,命中结果展示【[张彦峰]】
-
- java.util.concurrent.ExecutionException: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停
-
- at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
- at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
- at org.zyf.javabasic.java8.CompletableFutureTest.testAllOf(CompletableFutureTest.java:718)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
- at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
- at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
- at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
- at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
- at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
- at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
- at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
- at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
- at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
- at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
- at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
- at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
- at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
- at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
- at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
- at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
- at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
- at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
- at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
- at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
- at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
- at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
- at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
- at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
- Caused by: java.lang.RuntimeException: 正则词库管控处理查看数据数据命中敏感词情况异常暂停
- at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testAllOf$44(CompletableFutureTest.java:715)
- at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
- at java.lang.Thread.run(Thread.java:748)
情况三验证结果:重点关注打印日志“content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精”、“java.util.concurrent.ExecutionException: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停”,管道相关处理日志暂时忽略。
- 2023-01-22 12:06:23,329 [main] INFO o.z.j.java8.CompletableFutureTest [CompletableFutureTest.java : 734] - 情况三:当存在多个异常完成时,则返回排在前面的异步任务的异常信息。
- 2023-01-22 12:06:23,332 [ForkJoinPool.commonPool-worker-9] INFO o.z.j.d.r.p.common.CommonHeadHandler [CommonHeadHandler.java : 24] - 管道开始执行:管道名称为【数据清洗(用户文本)构建上下文】, context=ContentInfoContext(content=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, cleanContent=中國張彥峰㎵㎶㎷㎸㎹㎺外賣⏳⌚⏰,腾讯,南京酒精, contentAttr=ContentAttr(belong=null, cityCode=110010, source=null, sourceId=null, bizType=5))
-
- java.util.concurrent.ExecutionException: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停
-
- at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
- at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
- at org.zyf.javabasic.java8.CompletableFutureTest.testAllOf(CompletableFutureTest.java:739)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
- at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
- at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
- at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
- at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
- at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
- at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
- at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
- at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
- at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
- at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
- at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
- at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
- at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
- at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
- at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
- at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
- at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
- at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
- at org.junit.runners.Suite.runChild(Suite.java:128)
- at org.junit.runners.Suite.runChild(Suite.java:27)
- at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
- at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
- at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
- at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
- at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
- at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
- at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
- at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
- at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
- at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
- at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
- at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
- at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
- Caused by: java.lang.RuntimeException: 企业合规管控处理查看数据数据命中敏感词情况异常暂停
- at org.zyf.javabasic.java8.CompletableFutureTest.lambda$testAllOf$43(CompletableFutureTest.java:736)
- at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
- at java.lang.Thread.run(Thread.java:748)
-
8.多任务组合:anyOf
public static CompletableFuture