首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

19、Join操作map side join 和 reduce side joinHadoop系列文章目录一、join二、reduce side join三、map side join

  • 23-09-04 15:57
  • 2781
  • 12271
blog.csdn.net

Hadoop系列文章目录

1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化


文章目录

  • Hadoop系列文章目录
  • 一、join
    • 1、商品信息
    • 2、订单信息
  • 二、reduce side join
    • 1、主要步骤
    • 2、不足
    • 3、实现说明
    • 4、实现(未排序实现)
      • 1)、mapper
      • 2)、reducer
      • 3)、driver
      • 4)、验证
    • 5、实现(排序)
      • 1)、实现
      • 2)、验证
  • 三、map side join
    • 1、优势
    • 2、实现说明
    • 3、实现
      • 1)、mapper
      • 2)、reducer
      • 3)、driver
      • 4)、验证


本文介绍mapreduce的join操作。
本文前提是hadoop可以正常使用。
本文分为3个部分介绍,即join的介绍、map side join和reduce side join。

一、join

在使用MapReduce框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行join关联的操作,此时需要根据MapReduce的编程规范进行业务的实现。
整个MapReduce的join分为两类:Map Side Join、Reduce Side Join。
具体详见下文。
以下示例中的数据结构如下:
有两个数据文件:goods(商品信息)、order_goods(订单信息)。
要求使用MapReduce统计出每笔订单中对应的具体的商品名称信息。
比如: 107860—>AMAZFIT黑色硅胶腕带

1、商品信息

商品ID |编号 |名称
100101|155083444927602|四川果冻橙6个约180g/个
100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g

2、订单信息

订单ID|商品ID|成交价格
11152|108464|76
11152|109995|1899

二、reduce side join

在reduce阶段执行join关联操作。通过shuffle过程就可以将相关的数据分到相同的分组中,基于此可进行join。
在这里插入图片描述

1、主要步骤

  • mapper分别读取不同的数据集
  • mapper的输出中,通常以join的字段作为输出的key
  • 不同数据集的数据经过shuffle,key一样的会被分到同一分组处理
  • 在reduce中根据业务需求把数据进行关联整合汇总,最终输出

2、不足

  • reduce端join最大的问题是整个join的工作是在reduce阶段完成的,但是通常情况下MapReduce中reduce的并行度是极小的(默认是1个),这就使得所有的数据都挤压到reduce阶段处理,压力颇大
  • 在数据从mapper到reducer的过程中,shuffle阶段十分繁琐,数据集大时成本极高

3、实现说明

  • 根据不同的输入文件名称在mapper中进行相应的处理,本示例是在遍历出的数据加上G#和O#,分别标识是商品信息的数据和订单信息的数据
  • 选择相关联字段作为Map输出的key,在reducer时可以确保相同key的数据可以分在同一个组内。将2组有共同属性的数据作为key(关联字段),即商品ID
  • 在reduce实现中,将具有相同key(商品ID)的数据分goods和order区分存储在不同的数据结构中,本示例中order数据放在List中,商品信息放在HashMap中。在输出遍历时,循环List,同时根据商品ID获取其名称和编号即可
  • 如果对输出的结果有排序要求,则需要将上一步的输出结果按照需要排序的字段作为key。本示例中是以订单ID作为排序ID

4、实现(未排序实现)

1)、mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ReducerSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
	// goods.txt
	// order.txt
	String sourceFileName = "";
	Text outValue = new Text();
	Text outKey = new Text();

	protected void setup(Context context) throws IOException, InterruptedException {
		// 获取当前处理的切片所属的文件名字
		FileSplit inputSplit = (FileSplit) context.getInputSplit();
		sourceFileName = inputSplit.getPath().getName();
		System.out.println("当前正在处理的文件是:" + sourceFileName);
	}

//	商品信息
//	商品ID |编号           |名称
//	100101|155083444927602|四川果冻橙6个约180g/个
//	100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//	订单信息
//	订单ID |商品ID|成交价格
//	 11152|108464|76
//	 11152|109995|1899
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] line = value.toString().split("\|");	
		
		if ("goods.txt".equals(sourceFileName)) {// 商品信息
			Counter goodsCounter = context.getCounter("goods_records_counters", "goods Records");
			goodsCounter.increment(1);
			
			outKey.set(line[0]);
			outValue.set("G#" + value.toString());
		} else if ("order.txt".equals(sourceFileName)) {// 订单信息
			Counter orderCounter = context.getCounter("order_records_counters", "order Records");
			orderCounter.increment(1);
			
			outKey.set(line[1]);
			outValue.set("O#" + value.toString());
		}
		context.write(outKey, outValue);
		
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

2)、reducer

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReducerSideJoinReducer extends Reducer<Text, Text, Text, NullWritable> {

	List<String> orderList = new ArrayList<String>();
	Map<String, String> goodsMap = new HashMap<String, String>();
	StringBuilder outValue = new StringBuilder();
	Text outKey = new Text();

	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//		商品信息
//		商品ID |编号                      |名称
//		100101|155083444927602|四川果冻橙6个约180g/个
//		100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//		订单信息
//		订单ID|商品ID|成交价格
//		 11152|108464|76
//		 11152|109995|1899

		// 数据格式:
		// 商品信息
		// G#100101|155083444927602|四川果冻橙6个约180g/个
		// 订单信息
		// O#11152|108464|76

		for (Text value : values) {
			String temp = value.toString();
			if (temp.startsWith("G#")) {
				goodsMap.put(key.toString(), temp.split("G#")[1]);
			} else if (temp.startsWith("O#")) {
				orderList.add(temp.split("O#")[1]);
			}
		}

//		O#11152|108464|76
		// 正序排序,因为是按照商品ID进行分组过来的,所以此处排序不起作用
//		Collections.sort(orderList, new Comparator() {
//			@Override
//			public int compare(String orderId, String orderIdNew) {
//
//				int distance = Integer.parseInt(orderId.split("\|")[0]) - Integer.parseInt(orderIdNew.split("\|")[0]);
//				System.out.println(orderId + "   orderIdNew=" + orderIdNew + "   " + distance);
//				if (distance > 0) {
//					return 1;
//				} else if (distance < 0) {
//					return -1;
//				}
//				return 0;
//			}
//		});

		// 输出数据格式
		// 订单编号|商品ID|商品编号|商品名称|成交价格
		for (int i = 0; i < orderList.size(); i++) {
			String order[] = orderList.get(i).split("\|");
//			if (order[0].equals("1") || order[0].equals("2") || order[0].equals("3")) { //少量数据测试结果正确性
			outValue.append(order[0]).append("|").append(goodsMap.get(key.toString())).append("|").append(order[2]);
			outKey.set(outValue.toString());
			context.write(outKey, NullWritable.get());
//			}
		}
		outValue.setLength(0);
		goodsMap.clear();
		orderList.clear();
	}

/**
* 验证reducer是否正确
**/
	public static void main(String[] args) {

		List<String> orderList = new ArrayList<String>();
		orderList.add("O#1|101524|891");
		orderList.add("O#1|111835|10080");
		orderList.add("O#1|107848|1734");
		orderList.add("O#2|113561|11192");
		orderList.add("O#11152|108464|76");

//		String temp = "O#11152|108464|76";
//		System.out.println(temp.split("\|")[0].split("O#")[1]);
		Collections.sort(orderList, new Comparator<String>() {

			@Override
			public int compare(String orderId, String orderIdNew) {

				int distance = Integer.parseInt(orderId.split("\|")[0].split("O#")[1])
						- Integer.parseInt(orderIdNew.split("\|")[0].split("O#")[1]);
				if (distance > 0) {
					return 1;
				} else if (distance < 0) {
					return -1;
				}
				return 0;
			}
		});
		System.out.print(orderList);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

3)、driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReducerSideJoinDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new ReducerSideJoinDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Reduce Side Join Testing  ");
		// 设置作业驱动类
		job.setJarByClass(ReducerSideJoinDriver.class);

		// 设置mapper相关信息LongWritable, Text, Text, Text
		job.setMapperClass(ReducerSideJoinMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// 设置reducer相关信息 Text, Text, Text, NullWritable
		job.setReducerClass(ReducerSideJoinReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

4)、验证

以下是验证数据输出结果
在这里插入图片描述

5、实现(排序)

按照订单号正序排序
本示例是在未排序的基础上做的,即以未排序的结果为输入

1)、实现

mapper、reducer、driver放在一起

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceSideSort extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new ReduceSideSort(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Reduce Side Join Sort Testing  ");
		// 设置作业驱动类
		job.setJarByClass(ReduceSideSort.class);

		// 设置mapper相关信息LongWritable, Text, Text, Text
		job.setMapperClass(ReduceSideSortMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// 设置reducer相关信息 Text, Text, Text, NullWritable
		job.setReducerClass(ReduceSideSortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;

	}

	static class ReduceSideSortMapper extends Mapper<LongWritable, Text, Text, Text> {
// 未排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
//		1|101524|100006391000|指爱花盒|891
//		6|101626|100006879264|爱科技N200NC|7995
//		522|101658|100007019896|花花公子休闲鞋|536

// 按照订单号正序排序,排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
		Text outKey = new Text();
		Text outValue = new Text();

		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] line = value.toString().split("\|");
			outKey.set(line[0]);
			outValue.set(line[0] + "|" + line[1] + "|" + line[2] + "|" + line[3] + "|" + line[4]);
			context.write(outKey, outValue);
		}
	}

	static class ReduceSideSortReducer extends Reducer<Text, Text, Text, NullWritable> {
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				context.write(value, NullWritable.get());
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

2)、验证

从结果看,已经按照订单号正序排序了,此处是利用mapreducer的默认按照key的排序规则
在这里插入图片描述

三、map side join

map side join,就是在map阶段执行join关联操作,并且程序通常没reduce阶段,避免了shuffle时候的繁琐。实现Map端join的关键是使用MapReduce的分布式缓存。
在这里插入图片描述

1、优势

  • 整个join的过程没有shuffle,没有reducer,减少shuffle时候的数据传输成本。并且mapper的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势

2、实现说明

  • 分析处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存
  • MapReduce框架在执行的时候会自动将缓存的数据分发到各个maptask运行的机器上
  • 在mapper初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行join关联,输出最终的结果

3、实现

该处实现与reduce side join实现的功能是一样的,数据结构也是一样的,故不再赘述。

1)、mapper

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
	Map<String, String> goodsMap = new HashMap<String, String>();
	Text outKey = new Text();

	protected void setup(Context context) throws IOException, InterruptedException {
		// 读取缓存文件 千万别写成/goods.txt否则会提示找不到该文件
		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("goods.txt")));
		String line = null;
//		商品信息
//		商品ID |编号           |名称
//		100101|155083444927602|四川果冻橙6个约180g/个
//		100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
		while ((line = br.readLine()) != null) {
			String[] fields = line.split("\|");
			goodsMap.put(fields[0], fields[1] + "|" + fields[2]);
		}
	}

//	商品信息
//	商品ID |编号                      |名称
//	100101|155083444927602|四川果冻橙6个约180g/个
//	100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
//	订单信息
//	订单ID|商品ID|成交价格
//	 11152|108464|76
//	 11152|109995|1899
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] line = value.toString().split("\|");
		outKey.set(line[0] + "|" + line[1] + "|" + goodsMap.get(line[1]) + "|" + line[2]);
		context.write(outKey, NullWritable.get());
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

2)、reducer

无

3)、driver

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DistributedCacheDriver extends Configured implements Tool {
//执行命令:
//yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new DistributedCacheDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		// 创建作业实例
		Job job = Job.getInstance(conf, "Map Side Join Testing  DistributedCacheDriver");
		// 设置作业驱动类
		job.setJarByClass(DistributedCacheDriver.class);

		// 设置mapper相关信息LongWritable, Text, Text, NullWritable
		job.setMapperClass(DistributedCacheMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setNumReduceTasks(0);

		// 添加分布式缓存文件
		job.addCacheFile(new URI(args[2]));

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(args[1]))) {
			fs.delete(new Path(args[1]), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

4)、验证

该示例需要在hadoop环境中执行。

创建好相关的文件夹以及上传文件

  • 待处理文件(输入文件路径):/distributedcache/in
  • 缓存文件:/distributedcache/cachefiles/goods.txt
  • 输出文件路径:/distributedcache/out
# 执行命令:
yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt

#运行日志
[alanchan@server4 testMR]$ yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
2022-09-22 03:00:27,021 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/alanchan/.staging/job_1663661108338_0025
2022-09-22 03:00:32,826 INFO input.FileInputFormat: Total input files to process : 1
2022-09-22 03:00:33,008 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-22 03:00:33,179 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1663661108338_0025
2022-09-22 03:00:33,180 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-22 03:00:33,325 INFO conf.Configuration: resource-types.xml not found
2022-09-22 03:00:33,325 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-09-22 03:00:33,374 INFO impl.YarnClientImpl: Submitted application application_1663661108338_0025
2022-09-22 03:00:33,408 INFO mapreduce.Job: The url to track the job: http://server1:8088/proxy/application_1663661108338_0025/
2022-09-22 03:00:33,409 INFO mapreduce.Job: Running job: job_1663661108338_0025
2022-09-22 03:00:40,480 INFO mapreduce.Job: Job job_1663661108338_0025 running in uber mode : false
2022-09-22 03:00:40,481 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-22 03:00:46,523 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-22 03:00:47,531 INFO mapreduce.Job: Job job_1663661108338_0025 completed successfully
2022-09-22 03:00:47,607 INFO mapreduce.Job: Counters: 32
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=226398
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1272
                HDFS: Number of bytes written=10882
                HDFS: Number of read operations=7
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=8080
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=4040
                Total vcore-milliseconds taken by all map tasks=4040
                Total megabyte-milliseconds taken by all map tasks=41369600
        Map-Reduce Framework
                Map input records=76
                Map output records=76
                Input split bytes=117
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=35
                CPU time spent (ms)=650
                Physical memory (bytes) snapshot=205320192
                Virtual memory (bytes) snapshot=7332417536
                Total committed heap usage (bytes)=193462272
                Peak Map Physical memory (bytes)=205320192
                Peak Map Virtual memory (bytes)=7332417536
        File Input Format Counters 
                Bytes Read=1155
        File Output Format Counters 
                Bytes Written=10882
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在这里插入图片描述
至此介绍完mapreduce的join两种实现方式,其中map端的join用到了分布式缓存。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130455477"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top