首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

20、MapReduce 工作流介绍Hadoop系列文章目录一、MapReduce 工作流介绍二、使用示例

  • 23-09-04 16:01
  • 2608
  • 9998
blog.csdn.net

Hadoop系列文章目录

1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化


文章目录

  • Hadoop系列文章目录
  • 一、MapReduce 工作流介绍
  • 二、使用示例
    • 1、实现
    • 2、验证


本文介绍MapReduce 工作流。
本文前提:hadoop环境可用。

一、MapReduce 工作流介绍

多个MR作业,先后依次执行来计算得出最终结果。这类作业类似于DAG的任务,各个作业之间有依赖关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。
一般实际的业务场景中,可能使用定时调度工具进行调度,但本示例仅仅说明mapreduce自身也可以做到。
在这里插入图片描述

  • JobControl类:工作流job控制器,一次可以提交、管理多个job。JobControl类实现了线程Runnable接口。需要实例化一个线程来让它启动。
  • ControlledJob类:可以将普通作业包装成受控作业。并且支持设置依赖关系。Hadoop会根据依赖的关系,先后执行job任务,每个任务的运行都是独立的。

二、使用示例

MapReduce的join操作
将上述的Reduce side join 的例子连续起来运行,即第一步未排序输出,第二步针对上一步的输出进行排序。

1、实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.hadoop.mr.join.reducerside.ReduceSideSortDriver;
import org.hadoop.mr.join.reducerside.ReduceSideSortMapper;
import org.hadoop.mr.join.reducerside.ReduceSideSortReducer;
import org.hadoop.mr.join.reducerside.ReducerSideJoinDriver;
import org.hadoop.mr.join.reducerside.ReducerSideJoinMapper;
import org.hadoop.mr.join.reducerside.ReducerSideJoinReducer;

public class MRFlowDriver {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
	static String tempOut = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/unsortjoin";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		// 第一个作业的配置
		Job unSortjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ unSortjob", ReducerSideJoinDriver.class,
				ReducerSideJoinMapper.class, Text.class, Text.class, ReducerSideJoinReducer.class, Text.class,
				NullWritable.class, 1, in, tempOut);
	
		// 将普通作业包装成受控作业
		ControlledJob unSortControlledJob = new ControlledJob(conf);
		unSortControlledJob.setJob(unSortjob);

		// 第二个作业的配置
		Job sortedjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ sortedjob", ReduceSideSortDriver.class,
				ReduceSideSortMapper.class, Text.class, Text.class, ReduceSideSortReducer.class, Text.class,
				NullWritable.class, 1, tempOut, out);

		ControlledJob sortedControlledJob = new ControlledJob(conf);
		sortedControlledJob.setJob(sortedjob);

		// 设置job的依赖关系
		sortedControlledJob.addDependingJob(unSortControlledJob);

		// 主控制容器
		JobControl jobControl = new JobControl("jobControl");
		// 添加到总的JobControl里,进行控制
		jobControl.addJob(unSortControlledJob);
		jobControl.addJob(sortedControlledJob);

		// 在线程启动
		Thread t = new Thread(jobControl);
		t.start();
		while (true) {
			if (jobControl.allFinished()) {
				System.out.println("jobControl" + jobControl.getSuccessfulJobList());
				jobControl.stop();
				break;
			}

		}

	}

	/**
	 * 
	 * @param conf
	 * @param jobName
	 * @param cls
	 * @param clsMapper
	 * @param clsMapOutKey
	 * @param clsMapOutValue
	 * @param clsReducer
	 * @param clsReducerOutKey
	 * @param clsReducerOutValue
	 * @param tasks
	 * @return
	 * @throws Exception
	 */
	static Job getJob(Configuration conf, String jobName, Class<?> cls, Class<? extends Mapper> clsMapper,
			Class<?> clsMapOutKey, Class<?> clsMapOutValue, Class<? extends Reducer> clsReducer,
			Class<?> clsReducerOutKey, Class<?> clsReducerOutValue, int tasks, String in, String out) throws Exception {
		Job job = Job.getInstance(conf, jobName);
		// 设置作业驱动类
		job.setJarByClass(cls);
		// 设置mapper相关信息
		job.setMapperClass(clsMapper);
		job.setMapOutputKeyClass(clsMapOutKey);
		job.setMapOutputValueClass(clsMapOutValue);

		// 设置reducer相关信息
		job.setReducerClass(clsReducer);
		job.setOutputKeyClass(clsReducerOutKey);
		job.setOutputValueClass(clsReducerOutValue);

		job.setNumReduceTasks(tasks);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117

2、验证

运行日志

jobControl[job name:	Reduce Side Join DependingJob Testing ------ unSortjob
job id:	jobControl0
job state:	SUCCESS
job mapred id:	job_local1023947416_0001
job message:	just initialized
job has no depending job:	
, job name:	Reduce Side Join DependingJob Testing ------ sortedjob
job id:	jobControl1
job state:	SUCCESS
job mapred id:	job_local1967863010_0002
job message:	just initialized
job has 1 dependeng jobs:
	 depending job 0:	Reduce Side Join DependingJob Testing ------ unSortjob
]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

实际的功能与本示例中对应的链接示例结果一致,不再赘述。
至此,MapReduce的工作流示例介绍结束。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130455696"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top