首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

6.snail-job的MapReduce任务

  • 24-12-16 16:07
  • 3838
  • 23112
juejin.cn

前言

​ 上一节《5.snail-job的Map任务》中已经对Map任务有了大致的了解。并且上节中已经预告了本节介绍的任务类型-MapReduce。在Map任务的基础上再加上汇总的方法,就是MapReduce任务了。按照小学语文老师交给我们的写作技巧:总分总的关系来看。Map任务仅仅是总分关系,而MapReduce就是总分总关系了。

​ 可能你对上面的描述还有所疑惑,不过咱们通过上节的例子,和本节例子的结合对比,就能有所共鸣了。

本节目标

​ 上一节的例子是:200个数字,切成4个片,每个片中有50个数,对每个切片进行汇总计算。本节在这个基础上最后再多一步计算最终汇总值。从本例中了解的知识点如下:

  • 客户端采用继承类方式实现MapReduce任务
  • 客户端采用注解方式实现MapReduce任务
  • 服务器端配置分片数的作用

客户端代码1

这里针对的是reduce分片数设置 = 1的情况,后面会解释这个分片数的作用。

开发环境

  • JDK版本:openjdk-21.0.2
  • snail-job版本:1.2.0

Maven依赖

xml
代码解读
复制代码
<dependencies> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>com.aizudagroupId> <artifactId>snail-job-client-starterartifactId> <version>1.2.0version> dependency> <dependency> <groupId>com.aizudagroupId> <artifactId>snail-job-client-retry-coreartifactId> <version>1.2.0version> dependency> <dependency> <groupId>com.aizudagroupId> <artifactId>snail-job-client-job-coreartifactId> <version>1.2.0version> dependency> dependencies>

继承类方式

java
代码解读
复制代码
@Component public class TestMapReduce1 extends AbstractMapReduceExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { return switch (mapArgs.getTaskName()) { case SystemConstants.ROOT_MAP -> { // 生成1~200数值并分片 int partitionSize = 50; List> partition = IntStream.rangeClosed(1, 200) .boxed() .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) .values() .stream() .toList(); SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); yield mapHandler.doMap(partition, "doCalc"); } case "doCalc" -> { List sourceList = (List) mapArgs.getMapResult(); // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); // 打印日志到服务器 ThreadUtil.sleep(3, TimeUnit.SECONDS); SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); yield ExecuteResult.success(partitionTotal); } default -> ExecuteResult.failure(); }; } @Override protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { // 数据进行累加计算 int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal); return ExecuteResult.success(reduceTotal); } @Override protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) { return null; } }

解释说明:

  • 通过继承AbstractMapReduceExecutor类实现MapReduce任务
  • 其中的doJobMapExecute和上一节的作用一样,都是通过任务名称,来区分是ROOT_MAP任务还是分片后的处理。
  • doReduceExecute方法,如果配置的reduce分片数是1的话,那么会随机调用1个客户端进行reduce【汇总】操作。
  • doMergeReduceExecute方法是在配置reduce分片数>1的时候才会去执行。也是随机调用一个存活的客户端去执行。本例采用的reduce分片数=1,故这里没有任何代码。详细配置见服务端配置1。

注解方式

java
代码解读
复制代码
@Component @JobExecutor(name = "testMapReduceAnnotation1") public class TestMapReduceAnnotation1 { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { int partitionSize = 50; List> partition = IntStream.rangeClosed(1, 200) .boxed() .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) .values() .stream() .toList(); SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); return mapHandler.doMap(partition, "doCalc"); } @MapExecutor(taskName = "doCalc") public ExecuteResult doCalc(MapArgs mapArgs) { List sourceList = (List) mapArgs.getMapResult(); // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); // 打印日志到服务器 ThreadUtil.sleep(3, TimeUnit.SECONDS); SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); return ExecuteResult.success(partitionTotal); } @ReduceExecutor public ExecuteResult reduceExecute(ReduceArgs reduceArgs) { int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal); return ExecuteResult.success(reduceTotal); } }

解释说明:

  • 通过@JobExecutor注解标识该类是一个定时任务
  • 通过@MapExecutor注解来来处理分片、分片后的任务处理
  • 通过@ReduceExecutor注解来处理汇总任务
  • 由于配置的reduce分片数是1,所以这里用到把上面@ReduceExecutor汇总任务执行结果再次进行合并操作的注解

示意图

image-20241213160126069

服务端配置1

继承类方式

配置项配置内容
任务名称MapReduce任务1-继承类
状态禁用
任务类型MapReduce
自定义执行器com.mayuanfei.test.TestMapReduce1
reduce分片数1
并行数1

说明:

  • 状态:状态设置为禁用,是想通过手动执行来触发
  • 任务类型:要选本节介绍的任务类型MapReduce
  • 自定义执行器:继承类的方式要写全路径【复习】
  • 并行数:指客户端每台机器的线程数【复习】
  • reduce分片数:它设置的数量决定的doReduceExecute由几台客户端来执行。当它设置的数>1时,还会随机抽取一台客户端执行doMergeReduceExecute方法

注解方式

配置项配置内容
任务名称MapReduce任务1-注解
状态禁用
任务类型MapReduce
自定义执行器testMapReduceAnnotation1
reduce分片数1
并行数1

说明:

  • 自定义执行器 : 与注解@JobExecutor(name = "testMapReduceAnnotation1")名称一致【复习】

客户端代码1测试

这里不管是继承类方式,还是注解方式测试的结果都是一样的。

测试前提

web端口snail-job的客户端端口
91001900
92002900

可以参考《3.snail-job广播任务》的本机两个客户端启动章节的介绍。idea中配置如下:

shell
代码解读
复制代码
ver.port=9100 -Dsnail-job.port=1900 -Dserver.port=9200 -Dsnail-job.port=2900

测试MapReduce任务

  • 9100Web端口

    image-20241216083116432

  • 9200Web端口

    image-20241216083330658

服务端管理页面

image-20241216083704850

客户端代码2

这里针对reduce分片数设置>1的情况,会有reduce分片数设置的数量的客户端执行reduce方法。最后通过合并汇总方法来执行最后的运算。

继承类方式

java
代码解读
复制代码
@Component public class TestMapReduce2 extends AbstractMapReduceExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { return switch (mapArgs.getTaskName()) { case SystemConstants.ROOT_MAP -> { // 生成1~200数值并分片 int partitionSize = 50; List> partition = IntStream.rangeClosed(1, 200) .boxed() .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) .values() .stream() .toList(); SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); yield mapHandler.doMap(partition, "doCalc"); } case "doCalc" -> { List sourceList = (List) mapArgs.getMapResult(); // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); // 打印日志到服务器 ThreadUtil.sleep(3, TimeUnit.SECONDS); SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); yield ExecuteResult.success(partitionTotal); } default -> ExecuteResult.failure(); }; } @Override protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { // 数据进行累加计算 int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal); return ExecuteResult.success(reduceTotal); } @Override protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) { // 把reduce的结果进行累加计算 int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal); return ExecuteResult.success(mergeReduceTotal); } }

说明:

  • 只有reduce分片数>1时,doMergeReduceExecute方法才会被触发执行
  • 并且仅一个客户端节点执行doMergeReduceExecute方法

注解方式

java
代码解读
复制代码
@Component @JobExecutor(name = "testMapReduceAnnotation2") public class TestMapReduceAnnotation2 { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { int partitionSize = 50; List> partition = IntStream.rangeClosed(1, 200) .boxed() .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) .values() .stream() .toList(); SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); return mapHandler.doMap(partition, "doCalc"); } @MapExecutor(taskName = "doCalc") public ExecuteResult doCalc(MapArgs mapArgs) { List sourceList = (List) mapArgs.getMapResult(); // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); // 打印日志到服务器 ThreadUtil.sleep(3, TimeUnit.SECONDS); SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); return ExecuteResult.success(partitionTotal); } @ReduceExecutor public ExecuteResult reduceExecute(ReduceArgs reduceArgs) { int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal); return ExecuteResult.success(reduceTotal); } @MergeReduceExecutor public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { // 把reduce的结果进行累加计算 int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal); return ExecuteResult.success(mergeReduceTotal); } }

说明:

和继承类方式完全一致,只不过这里采用注解方式实现。同理@MergeReduceExecutor只有reduce分片数>1时才有意义。

示意图

image-20241213160631540

服务端配置2

继承类方式

配置项配置内容
任务名称MapReduce任务1-继承类
状态禁用
任务类型MapReduce
自定义执行器com.mayuanfei.test.TestMapReduce2
reduce分片数2
并行数1

说明:

这里reduce分片数=2

注解方式

配置项配置内容
任务名称MapReduce任务2-注解
状态禁用
任务类型MapReduce
自定义执行器testMapReduceAnnotation2
reduce分片数2
并行数1

说明:

这里reduce分片数=2

客户端代码2测试

测试前提

web端口snail-job的客户端端口
91001900
92002900

可以参考《3.snail-job广播任务》的本机两个客户端启动章节的介绍。idea中配置如下:

shell
代码解读
复制代码
-Dserver.port=9100 -Dsnail-job.port=1900 -Dserver.port=9200 -Dsnail-job.port=2900

测试MapReduce任务

  • 9100Web端口

    image-20241216084216524

  • 9200Web端口

    image-20241216084454429

服务端管理页面

image-20241216084817923

总结

  • MapReduce任务就是一个总分总的过程。
    • @MapExecutor【总】
    • @MapExecutor(taskName="doCalc") 【分】
    • @ReduceExecutor【总】
  • MapReduce任务用注解方式似乎更容易理解。
  • reduce分片数的作用:
    • 等于1时:@ReduceExecutor会随机客户端执行,@MergeReduceExecutor无效。
    • 大于1时:@ReduceExecutor按照指定配置数量随机找出后执行,@MergeReduceExecutor会随机找一台客户端执行
  • 分片数量不建议过多(大于200时会提示分片过多,最多不能超过500个分片)
注:本文转载自juejin.cn的老马9527的文章"https://juejin.cn/post/7448551286506913802"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

103
后端
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top