Hadoop系列文章目录
1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化
本文介绍mapreduce的join操作。
本文前提是hadoop可以正常使用。
本文分为3个部分介绍,即join的介绍、map side join和reduce side join。
一、join
在使用MapReduce框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行join关联的操作,此时需要根据MapReduce的编程规范进行业务的实现。
整个MapReduce的join分为两类:Map Side Join、Reduce Side Join。
具体详见下文。
以下示例中的数据结构如下:
有两个数据文件:goods(商品信息)、order_goods(订单信息)。
要求使用MapReduce统计出每笔订单中对应的具体的商品名称信息。
比如: 107860—>AMAZFIT黑色硅胶腕带
1、商品信息
商品ID |编号 |名称
100101|155083444927602|四川果冻橙6个约180g/个
100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
2、订单信息
订单ID|商品ID|成交价格
11152|108464|76
11152|109995|1899
二、reduce side join
在reduce阶段执行join关联操作。通过shuffle过程就可以将相关的数据分到相同的分组中,基于此可进行join。
1、主要步骤
- mapper分别读取不同的数据集
- mapper的输出中,通常以join的字段作为输出的key
- 不同数据集的数据经过shuffle,key一样的会被分到同一分组处理
- 在reduce中根据业务需求把数据进行关联整合汇总,最终输出
2、不足
- reduce端join最大的问题是整个join的工作是在reduce阶段完成的,但是通常情况下MapReduce中reduce的并行度是极小的(默认是1个),这就使得所有的数据都挤压到reduce阶段处理,压力颇大
- 在数据从mapper到reducer的过程中,shuffle阶段十分繁琐,数据集大时成本极高
3、实现说明
- 根据不同的输入文件名称在mapper中进行相应的处理,本示例是在遍历出的数据加上G#和O#,分别标识是商品信息的数据和订单信息的数据
- 选择相关联字段作为Map输出的key,在reducer时可以确保相同key的数据可以分在同一个组内。将2组有共同属性的数据作为key(关联字段),即商品ID
- 在reduce实现中,将具有相同key(商品ID)的数据分goods和order区分存储在不同的数据结构中,本示例中order数据放在List中,商品信息放在HashMap中。在输出遍历时,循环List,同时根据商品ID获取其名称和编号即可
- 如果对输出的结果有排序要求,则需要将上一步的输出结果按照需要排序的字段作为key。本示例中是以订单ID作为排序ID
4、实现(未排序实现)
1)、mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ReducerSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
// goods.txt
// order.txt
String sourceFileName = "";
Text outValue = new Text();
Text outKey = new Text();
protected void setup(Context context) throws IOException, InterruptedException {
// 获取当前处理的切片所属的文件名字
FileSplit inputSplit = (FileSplit) context.getInputSplit();
sourceFileName = inputSplit.getPath().getName();
System.out.println("当前正在处理的文件是:" + sourceFileName);
}
// 商品信息
// 商品ID |编号 |名称
// 100101|155083444927602|四川果冻橙6个约180g/个
// 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
// 订单信息
// 订单ID |商品ID|成交价格
// 11152|108464|76
// 11152|109995|1899
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\|");
if ("goods.txt".equals(sourceFileName)) {// 商品信息
Counter goodsCounter = context.getCounter("goods_records_counters", "goods Records");
goodsCounter.increment(1);
outKey.set(line[0]);
outValue.set("G#" + value.toString());
} else if ("order.txt".equals(sourceFileName)) {// 订单信息
Counter orderCounter = context.getCounter("order_records_counters", "order Records");
orderCounter.increment(1);
outKey.set(line[1]);
outValue.set("O#" + value.toString());
}
context.write(outKey, outValue);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
2)、reducer
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ReducerSideJoinReducer extends Reducer<Text, Text, Text, NullWritable> {
List<String> orderList = new ArrayList<String>();
Map<String, String> goodsMap = new HashMap<String, String>();
StringBuilder outValue = new StringBuilder();
Text outKey = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 商品信息
// 商品ID |编号 |名称
// 100101|155083444927602|四川果冻橙6个约180g/个
// 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
// 订单信息
// 订单ID|商品ID|成交价格
// 11152|108464|76
// 11152|109995|1899
// 数据格式:
// 商品信息
// G#100101|155083444927602|四川果冻橙6个约180g/个
// 订单信息
// O#11152|108464|76
for (Text value : values) {
String temp = value.toString();
if (temp.startsWith("G#")) {
goodsMap.put(key.toString(), temp.split("G#")[1]);
} else if (temp.startsWith("O#")) {
orderList.add(temp.split("O#")[1]);
}
}
// O#11152|108464|76
// 正序排序,因为是按照商品ID进行分组过来的,所以此处排序不起作用
// Collections.sort(orderList, new Comparator() {
// @Override
// public int compare(String orderId, String orderIdNew) {
//
// int distance = Integer.parseInt(orderId.split("\|")[0]) - Integer.parseInt(orderIdNew.split("\|")[0]);
// System.out.println(orderId + " orderIdNew=" + orderIdNew + " " + distance);
// if (distance > 0) {
// return 1;
// } else if (distance < 0) {
// return -1;
// }
// return 0;
// }
// });
// 输出数据格式
// 订单编号|商品ID|商品编号|商品名称|成交价格
for (int i = 0; i < orderList.size(); i++) {
String order[] = orderList.get(i).split("\|");
// if (order[0].equals("1") || order[0].equals("2") || order[0].equals("3")) { //少量数据测试结果正确性
outValue.append(order[0]).append("|").append(goodsMap.get(key.toString())).append("|").append(order[2]);
outKey.set(outValue.toString());
context.write(outKey, NullWritable.get());
// }
}
outValue.setLength(0);
goodsMap.clear();
orderList.clear();
}
/**
* 验证reducer是否正确
**/
public static void main(String[] args) {
List<String> orderList = new ArrayList<String>();
orderList.add("O#1|101524|891");
orderList.add("O#1|111835|10080");
orderList.add("O#1|107848|1734");
orderList.add("O#2|113561|11192");
orderList.add("O#11152|108464|76");
// String temp = "O#11152|108464|76";
// System.out.println(temp.split("\|")[0].split("O#")[1]);
Collections.sort(orderList, new Comparator<String>() {
@Override
public int compare(String orderId, String orderIdNew) {
int distance = Integer.parseInt(orderId.split("\|")[0].split("O#")[1])
- Integer.parseInt(orderIdNew.split("\|")[0].split("O#")[1]);
if (distance > 0) {
return 1;
} else if (distance < 0) {
return -1;
}
return 0;
}
});
System.out.print(orderList);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
3)、driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReducerSideJoinDriver extends Configured implements Tool {
static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new ReducerSideJoinDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// 创建作业实例
Job job = Job.getInstance(conf, "Reduce Side Join Testing ");
// 设置作业驱动类
job.setJarByClass(ReducerSideJoinDriver.class);
// 设置mapper相关信息LongWritable, Text, Text, Text
job.setMapperClass(ReducerSideJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置reducer相关信息 Text, Text, Text, NullWritable
job.setReducerClass(ReducerSideJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置输入的文件的路径
FileInputFormat.setInputPaths(job, new Path(in));
FileSystem fs = FileSystem.get(getConf());
if (fs.exists(new Path(out))) {
fs.delete(new Path(out), true);
}
FileOutputFormat.setOutputPath(job, new Path(out));
return job.waitForCompletion(true) ? 0 : 1;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
4)、验证
以下是验证数据输出结果
5、实现(排序)
按照订单号正序排序
本示例是在未排序的基础上做的,即以未排序的结果为输入
1)、实现
mapper、reducer、driver放在一起
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReduceSideSort extends Configured implements Tool {
static String in = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/join";
static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new ReduceSideSort(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// 创建作业实例
Job job = Job.getInstance(conf, "Reduce Side Join Sort Testing ");
// 设置作业驱动类
job.setJarByClass(ReduceSideSort.class);
// 设置mapper相关信息LongWritable, Text, Text, Text
job.setMapperClass(ReduceSideSortMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置reducer相关信息 Text, Text, Text, NullWritable
job.setReducerClass(ReduceSideSortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置输入的文件的路径
FileInputFormat.setInputPaths(job, new Path(in));
FileSystem fs = FileSystem.get(getConf());
if (fs.exists(new Path(out))) {
fs.delete(new Path(out), true);
}
FileOutputFormat.setOutputPath(job, new Path(out));
return job.waitForCompletion(true) ? 0 : 1;
}
static class ReduceSideSortMapper extends Mapper<LongWritable, Text, Text, Text> {
// 未排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
// 1|101524|100006391000|指爱花盒|891
// 6|101626|100006879264|爱科技N200NC|7995
// 522|101658|100007019896|花花公子休闲鞋|536
// 按照订单号正序排序,排序输出:订单编号、商品id、商品编号、商品名称、实际支付价格
Text outKey = new Text();
Text outValue = new Text();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\|");
outKey.set(line[0]);
outValue.set(line[0] + "|" + line[1] + "|" + line[2] + "|" + line[3] + "|" + line[4]);
context.write(outKey, outValue);
}
}
static class ReduceSideSortReducer extends Reducer<Text, Text, Text, NullWritable> {
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, NullWritable.get());
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
2)、验证
从结果看,已经按照订单号正序排序了,此处是利用mapreducer的默认按照key的排序规则
三、map side join
map side join,就是在map阶段执行join关联操作,并且程序通常没reduce阶段,避免了shuffle时候的繁琐。实现Map端join的关键是使用MapReduce的分布式缓存。
1、优势
- 整个join的过程没有shuffle,没有reducer,减少shuffle时候的数据传输成本。并且mapper的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势
2、实现说明
- 分析处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存
- MapReduce框架在执行的时候会自动将缓存的数据分发到各个maptask运行的机器上
- 在mapper初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行join关联,输出最终的结果
3、实现
该处实现与reduce side join实现的功能是一样的,数据结构也是一样的,故不再赘述。
1)、mapper
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> goodsMap = new HashMap<String, String>();
Text outKey = new Text();
protected void setup(Context context) throws IOException, InterruptedException {
// 读取缓存文件 千万别写成/goods.txt否则会提示找不到该文件
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("goods.txt")));
String line = null;
// 商品信息
// 商品ID |编号 |名称
// 100101|155083444927602|四川果冻橙6个约180g/个
// 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
while ((line = br.readLine()) != null) {
String[] fields = line.split("\|");
goodsMap.put(fields[0], fields[1] + "|" + fields[2]);
}
}
// 商品信息
// 商品ID |编号 |名称
// 100101|155083444927602|四川果冻橙6个约180g/个
// 100102|155083493976803|鲜丰水果秭归脐橙中华红橙9斤家庭装单果130g—220g4500g
// 订单信息
// 订单ID|商品ID|成交价格
// 11152|108464|76
// 11152|109995|1899
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\|");
outKey.set(line[0] + "|" + line[1] + "|" + goodsMap.get(line[1]) + "|" + line[2]);
context.write(outKey, NullWritable.get());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
2)、reducer
无
3)、driver
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DistributedCacheDriver extends Configured implements Tool {
//执行命令:
//yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new DistributedCacheDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// 创建作业实例
Job job = Job.getInstance(conf, "Map Side Join Testing DistributedCacheDriver");
// 设置作业驱动类
job.setJarByClass(DistributedCacheDriver.class);
// 设置mapper相关信息LongWritable, Text, Text, NullWritable
job.setMapperClass(DistributedCacheMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
// 添加分布式缓存文件
job.addCacheFile(new URI(args[2]));
// 设置输入的文件的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileSystem fs = FileSystem.get(getConf());
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
4)、验证
该示例需要在hadoop环境中执行。
创建好相关的文件夹以及上传文件
- 待处理文件(输入文件路径):/distributedcache/in
- 缓存文件:/distributedcache/cachefiles/goods.txt
- 输出文件路径:/distributedcache/out
# 执行命令:
yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
#运行日志
[alanchan@server4 testMR]$ yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.distributedcache.DistributedCacheDriver /distributedcache/in /distributedcache/out /distributedcache/cachefiles/goods.txt
2022-09-22 03:00:27,021 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/alanchan/.staging/job_1663661108338_0025
2022-09-22 03:00:32,826 INFO input.FileInputFormat: Total input files to process : 1
2022-09-22 03:00:33,008 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-22 03:00:33,179 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1663661108338_0025
2022-09-22 03:00:33,180 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-22 03:00:33,325 INFO conf.Configuration: resource-types.xml not found
2022-09-22 03:00:33,325 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-09-22 03:00:33,374 INFO impl.YarnClientImpl: Submitted application application_1663661108338_0025
2022-09-22 03:00:33,408 INFO mapreduce.Job: The url to track the job: http://server1:8088/proxy/application_1663661108338_0025/
2022-09-22 03:00:33,409 INFO mapreduce.Job: Running job: job_1663661108338_0025
2022-09-22 03:00:40,480 INFO mapreduce.Job: Job job_1663661108338_0025 running in uber mode : false
2022-09-22 03:00:40,481 INFO mapreduce.Job: map 0% reduce 0%
2022-09-22 03:00:46,523 INFO mapreduce.Job: map 100% reduce 0%
2022-09-22 03:00:47,531 INFO mapreduce.Job: Job job_1663661108338_0025 completed successfully
2022-09-22 03:00:47,607 INFO mapreduce.Job: Counters: 32
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=226398
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1272
HDFS: Number of bytes written=10882
HDFS: Number of read operations=7
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8080
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=4040
Total vcore-milliseconds taken by all map tasks=4040
Total megabyte-milliseconds taken by all map tasks=41369600
Map-Reduce Framework
Map input records=76
Map output records=76
Input split bytes=117
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=35
CPU time spent (ms)=650
Physical memory (bytes) snapshot=205320192
Virtual memory (bytes) snapshot=7332417536
Total committed heap usage (bytes)=193462272
Peak Map Physical memory (bytes)=205320192
Peak Map Virtual memory (bytes)=7332417536
File Input Format Counters
Bytes Read=1155
File Output Format Counters
Bytes Written=10882
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
至此介绍完mapreduce的join两种实现方式,其中map端的join用到了分布式缓存。
评论记录:
回复评论: