首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

15、MapReduce介绍及wordcountHadoop系列文章目录一、mapreduce编程模型二、wordcount实现三、运行环境介绍

  • 23-09-04 16:01
  • 4359
  • 9210
blog.csdn.net

Hadoop系列文章目录

1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化


文章目录

  • Hadoop系列文章目录
  • 一、mapreduce编程模型
    • 1、MapReduce介绍
    • 2、MapReduce编程规范
    • 3、序列化
    • 4、hadoop数据类型
    • 5、示例
  • 二、wordcount实现
    • 1、pom.xml
    • 2、Mapper
    • 3、Reducer
    • 4、Driver
    • 5、完整的代码(WordCount)
    • 6、Driver推荐写法
    • 7、运行结果
      • 1)、运行日志
      • 2)、运行结果
  • 三、运行环境介绍
    • 1、yarn运行模式
      • 1)、在pom.xml文件下用mvn打包
      • 2)、上传打包好的jar文件
      • 3)、执行mr程序
      • 4)、查看结果
    • 2、local运行模式


本文主要介绍mapreduce的编程模型及wordcount实现、运行环境介绍。
前提依赖:hadoop环境可用,且本地的编码环境已具备。若无,则建议参考本专栏的相关文章。
本文分为3个部分,即mapreduce编程模型介绍和wordcount实现、运行环境介绍。

一、mapreduce编程模型

1、MapReduce介绍

MapReduce的思想核心是分布式计算,即先分散再聚合。

  • 分散就是把一个大的问题,按照一定的策略分为等价的、规模较小的若干部分,然后逐个解决,分别计算出各部分的结果
  • 聚合就是最后把各部分的结果组成整个问题的最终结果

Map负责“分散”:即把大的任务分解为若干个小任务来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“聚合”:即对map阶段的结果进行全局汇总。
在这里插入图片描述
一个完整的MapReduce程序在分布式运行时有三类实例进程:
MRAppMaster:负责整个程序的过程调度及状态协调
MapTask:负责map阶段的整个数据处理流程
ReduceTask:负责reduce阶段的整个数据处理流程
在这里插入图片描述
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段
如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序串行运行

2、MapReduce编程规范

  • 程序代码分成三个部分:Mapper,Reducer,Driver(客户端提交作业驱动程序)
  • 用户自定义的Mapper和Reducer都要继承各自的父类
  • Mapper中的业务逻辑写在map()方法中
  • Reducer的业务逻辑写在reduce()方法中
  • 整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象
  • 整个MapReduce程序中,数据都是以kv键值对的形式流转的

在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么
MapReduce内置了很多默认属性,比如排序、分组等,都和数据的k有关,kv的类型数据确定及其重要
在这里插入图片描述

3、序列化

由于MR是在网络之间存储与计算的,所以涉及到传递的对象都需要序列化。
hadoop的序列化没有使用java的序列化java.io.Serializable,而是自己实现了序列化Writable。
Hadoop通过Writable接口实现的序列化机制,接口提供两个方法write和readFields。

  • write叫做序列化方法,用于把对象指定的字段写出去
  • readFields叫做反序列化方法,用于从字节流中读取字段重构对象
    在这里插入图片描述
    Hadoop没有提供对象比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。WritableComparable接口可用于用户自定义对象的比较规则。
    在这里插入图片描述

4、hadoop数据类型

Hadoop内置实现了如下的数据类型,且都实现了WritableComparable接口,可以直接使用(都实现了序列化)。
在这里插入图片描述
如果以上数据类型不能满足需要,则可自定义数据类型。自定义数据类型必须实现Hadoop的序列化机制Writable。如果需要将自定义的对象作为key传递,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
示例

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import lombok.Data;

/**
 * @author alanchan 
 * 实现Hadoop序列化接口Writable 
 * 从数据库读取/写入数据库的对象应实现DBWritable
 */
@Data
public class User implements Writable, DBWritable {
	private int id;
	private String userName;
	private String password;
	private String phone;
	private String email;
	private String createDay;

	@Override
	public void write(PreparedStatement ps) throws SQLException {
		ps.setInt(1, id);
		ps.setString(2, userName);
		ps.setString(3, password);
		ps.setString(4, phone);
		ps.setString(5, email);
		ps.setString(6, createDay);
	}

	@Override
	public void readFields(ResultSet rs) throws SQLException {
		this.id = rs.getInt(1);
		this.userName = rs.getString(2);
		this.password = rs.getString(3);
		this.phone = rs.getString(4);
		this.email = rs.getString(5);
		this.createDay = rs.getString(6);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(id);
		out.writeUTF(userName);
		out.writeUTF(password);
		out.writeUTF(phone);
		out.writeUTF(email);
		out.writeUTF(createDay);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		id = in.readInt();
		userName = in.readUTF();
		password = in.readUTF();
		phone = in.readUTF();
		email = in.readUTF();
		createDay = in.readUTF();
	}

	public String toString() {
		return id + "	" + userName + "	" + password + "	" + phone + "	" + email + "	" + createDay;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

5、示例

public class UserMapper extends Mapper {
    @Override
    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {

    }
}

public class UserReducer extends Reducer{
    @Override
    protected void reduce(Object key, Iterable values, Context context) throws IOException, InterruptedException {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

二、wordcount实现

思路:

  • map阶段,把输入的数据经过切割,全部标记1。因此输出就是<单词,1>。
  • shuffle阶段,经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。
  • reduce阶段,处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是单词的总次数。
  • 在这里插入图片描述

1、pom.xml

		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-commonartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-clientartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-hdfsartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-mapreduce-client-coreartifactId>
			<version>3.1.4version>
		dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2、Mapper

class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	// Mapper输出kv键值对 <单词,1>
	private Text keyOut = new Text();
	private final static LongWritable valueOut = new LongWritable(1);

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 将读取的一行内容根据分隔符进行切割
		String[] words = value.toString().split("\s+");
		// 遍历单词数组
		for (String word : words) {
			keyOut.set(word);
			// 输出单词,并标记1
			context.write(new Text(word), valueOut);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3、Reducer

class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	private LongWritable result = new LongWritable();

	@Override
	protected void reduce(Text key, Iterable<LongWritable> values, Context context)
			throws IOException, InterruptedException {
		// 统计变量
		long count = 0;
		// 遍历一组数据,取出该组所有的value
		for (LongWritable value : values) {
			// 所有的value累加 就是该单词的总次数
			count += value.get();
		}
		result.set(count);
		// 输出最终结果<单词,总次数>
		context.write(key, result);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4、Driver

public static void main(String[] args) throws Exception {
		// 配置文件对象
		Configuration conf = new Configuration();
		// 创建作业实例
		Job job = Job.getInstance(conf, WC.class.getSimpleName());
		// 设置作业驱动类
		job.setJarByClass(WC.class);
		// 设置作业mapper reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		// 设置作业mapper阶段输出key value数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		String in = "D:/workspace/bigdata-component/hadoop/test/in/1.txt";
		String out = "D:/workspace/bigdata-component/hadoop/test/out";
		// 配置作业的输入数据路径
		FileInputFormat.addInputPath(job, new Path(in));
		// 配置作业的输出数据路径
		FileOutputFormat.setOutputPath(job, new Path(out));
		// 判断输出路径是否存在 如果存在删除
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		// 提交作业并等待执行完成
		boolean resultFlag = job.waitForCompletion(true);
		// 程序退出
		System.exit(resultFlag ? 0 : 1);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

5、完整的代码(WordCount)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class WC {

	public static void main(String[] args) throws Exception {
		// 配置文件对象
		Configuration conf = new Configuration();
		// 创建作业实例
		Job job = Job.getInstance(conf, WC.class.getSimpleName());
		// 设置作业驱动类
		job.setJarByClass(WC.class);
		// 设置作业mapper reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		// 设置作业mapper阶段输出key value数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		String in = "D:/workspace/bigdata-component/hadoop/test/in/1.txt";
		String out = "D:/workspace/bigdata-component/hadoop/test/out";
		// 配置作业的输入数据路径
		FileInputFormat.addInputPath(job, new Path(in));
		// 配置作业的输出数据路径
		FileOutputFormat.setOutputPath(job, new Path(out));
		// 判断输出路径是否存在 如果存在删除
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		// 提交作业并等待执行完成
		boolean resultFlag = job.waitForCompletion(true);
		// 程序退出
		System.exit(resultFlag ? 0 : 1);
	}

}

class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	// Mapper输出kv键值对 <单词,1>
	private Text keyOut = new Text();
	private final static LongWritable valueOut = new LongWritable(1);

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 将读取的一行内容根据分隔符进行切割
		String[] words = value.toString().split("\s+");
		// 遍历单词数组
		for (String word : words) {
			keyOut.set(word);
			// 输出单词,并标记1
			context.write(new Text(word), valueOut);
		}
	}
}

class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	private LongWritable result = new LongWritable();

	@Override
	protected void reduce(Text key, Iterable<LongWritable> values, Context context)
			throws IOException, InterruptedException {
		// 统计变量
		long count = 0;
		// 遍历一组数据,取出该组所有的value
		for (LongWritable value : values) {
			// 所有的value累加 就是该单词的总次数
			count += value.get();
		}
		result.set(count);
		// 输出最终结果<单词,总次数>
		context.write(key, result);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

6、Driver推荐写法

使用org.apache.hadoop.util.Tool类进行驱动MR运行。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountByTool extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/1.txt";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out";

	public static void main(String[] args) throws Exception {
		// 配置文件对象
		Configuration conf = new Configuration();
		// 使用工具类ToolRunner提交程序
		int status = ToolRunner.run(conf, new WordCountByTool(), args);
		// 退出客户端程序 客户端退出状态码和MapReduce程序执行结果绑定
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		// 创建作业实例
		Job job = Job.getInstance(getConf(), WordCountByTool.class.getSimpleName());
		// 设置作业驱动类
		job.setJarByClass(WordCountByTool.class);
		// 设置作业mapper reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		// 设置作业mapper阶段输出key value数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		// 配置作业的输入数据路径
		FileInputFormat.addInputPath(job, new Path(in));
		// 配置作业的输出数据路径
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

7、运行结果

1)、运行日志

2022-09-13 15:48:44,308 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-jobtracker.properties,hadoop-metrics2.properties
2022-09-13 15:48:44,346 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-09-13 15:48:44,346 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2022-09-13 15:48:44,813 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2022-09-13 15:48:44,828 INFO input.FileInputFormat: Total input files to process : 1
2022-09-13 15:48:44,853 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-13 15:48:44,900 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1846116712_0001
2022-09-13 15:48:44,901 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-13 15:48:44,965 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2022-09-13 15:48:44,965 INFO mapreduce.Job: Running job: job_local1846116712_0001
2022-09-13 15:48:44,966 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2022-09-13 15:48:44,970 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-13 15:48:44,970 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-13 15:48:44,970 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2022-09-13 15:48:44,988 INFO mapred.LocalJobRunner: Waiting for map tasks
2022-09-13 15:48:44,988 INFO mapred.LocalJobRunner: Starting task: attempt_local1846116712_0001_m_000000_0
2022-09-13 15:48:44,998 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-13 15:48:44,998 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-13 15:48:45,003 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2022-09-13 15:48:45,025 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@2c5f560b
2022-09-13 15:48:45,028 INFO mapred.MapTask: Processing split: file:/D:/workspace/bigdata-component/hadoop/test/in/1.txt:0+712
2022-09-13 15:48:45,057 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2022-09-13 15:48:45,057 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2022-09-13 15:48:45,057 INFO mapred.MapTask: soft limit at 83886080
2022-09-13 15:48:45,057 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2022-09-13 15:48:45,057 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2022-09-13 15:48:45,058 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2022-09-13 15:48:45,063 INFO mapred.LocalJobRunner: 
2022-09-13 15:48:45,064 INFO mapred.MapTask: Starting flush of map output
2022-09-13 15:48:45,064 INFO mapred.MapTask: Spilling map output
2022-09-13 15:48:45,064 INFO mapred.MapTask: bufstart = 0; bufend = 1511; bufvoid = 104857600
2022-09-13 15:48:45,064 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213992(104855968); length = 405/6553600
2022-09-13 15:48:45,073 INFO mapred.MapTask: Finished spill 0
2022-09-13 15:48:45,080 INFO mapred.Task: Task:attempt_local1846116712_0001_m_000000_0 is done. And is in the process of committing
2022-09-13 15:48:45,082 INFO mapred.LocalJobRunner: map
2022-09-13 15:48:45,082 INFO mapred.Task: Task 'attempt_local1846116712_0001_m_000000_0' done.
2022-09-13 15:48:45,086 INFO mapred.Task: Final Counters for attempt_local1846116712_0001_m_000000_0: Counters: 17
	File System Counters
		FILE: Number of bytes read=890
		FILE: Number of bytes written=515279
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=19
		Map output records=102
		Map output bytes=1511
		Map output materialized bytes=1721
		Input split bytes=122
		Combine input records=0
		Spilled Records=102
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=0
		Total committed heap usage (bytes)=255328256
	File Input Format Counters 
		Bytes Read=712
2022-09-13 15:48:45,086 INFO mapred.LocalJobRunner: Finishing task: attempt_local1846116712_0001_m_000000_0
2022-09-13 15:48:45,086 INFO mapred.LocalJobRunner: map task executor complete.
2022-09-13 15:48:45,087 INFO mapred.LocalJobRunner: Waiting for reduce tasks
2022-09-13 15:48:45,088 INFO mapred.LocalJobRunner: Starting task: attempt_local1846116712_0001_r_000000_0
2022-09-13 15:48:45,093 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2022-09-13 15:48:45,093 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2022-09-13 15:48:45,093 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2022-09-13 15:48:45,120 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@630ffddc
2022-09-13 15:48:45,122 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@50acc978
2022-09-13 15:48:45,123 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2022-09-13 15:48:45,130 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=2639842560, maxSingleShuffleLimit=659960640, mergeThreshold=1742296192, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2022-09-13 15:48:45,131 INFO reduce.EventFetcher: attempt_local1846116712_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2022-09-13 15:48:45,145 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1846116712_0001_m_000000_0 decomp: 1717 len: 1721 to MEMORY
2022-09-13 15:48:45,147 INFO reduce.InMemoryMapOutput: Read 1717 bytes from map-output for attempt_local1846116712_0001_m_000000_0
2022-09-13 15:48:45,147 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1717, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1717
2022-09-13 15:48:45,148 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
2022-09-13 15:48:45,148 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-13 15:48:45,148 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2022-09-13 15:48:45,173 INFO mapred.Merger: Merging 1 sorted segments
2022-09-13 15:48:45,173 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1714 bytes
2022-09-13 15:48:45,174 INFO reduce.MergeManagerImpl: Merged 1 segments, 1717 bytes to disk to satisfy reduce memory limit
2022-09-13 15:48:45,174 INFO reduce.MergeManagerImpl: Merging 1 files, 1721 bytes from disk
2022-09-13 15:48:45,174 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
2022-09-13 15:48:45,175 INFO mapred.Merger: Merging 1 sorted segments
2022-09-13 15:48:45,175 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1714 bytes
2022-09-13 15:48:45,175 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-13 15:48:45,179 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2022-09-13 15:48:45,185 INFO mapred.Task: Task:attempt_local1846116712_0001_r_000000_0 is done. And is in the process of committing
2022-09-13 15:48:45,185 INFO mapred.LocalJobRunner: 1 / 1 copied.
2022-09-13 15:48:45,185 INFO mapred.Task: Task attempt_local1846116712_0001_r_000000_0 is allowed to commit now
2022-09-13 15:48:45,188 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1846116712_0001_r_000000_0' to file:/D:/workspace/bigdata-component/hadoop/test/out
2022-09-13 15:48:45,188 INFO mapred.LocalJobRunner: reduce > reduce
2022-09-13 15:48:45,188 INFO mapred.Task: Task 'attempt_local1846116712_0001_r_000000_0' done.
2022-09-13 15:48:45,188 INFO mapred.Task: Final Counters for attempt_local1846116712_0001_r_000000_0: Counters: 24
	File System Counters
		FILE: Number of bytes read=4364
		FILE: Number of bytes written=517316
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Combine input records=0
		Combine output records=0
		Reduce input groups=30
		Reduce shuffle bytes=1721
		Reduce input records=102
		Reduce output records=30
		Spilled Records=102
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=0
		Total committed heap usage (bytes)=255328256
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Output Format Counters 
		Bytes Written=316
2022-09-13 15:48:45,188 INFO mapred.LocalJobRunner: Finishing task: attempt_local1846116712_0001_r_000000_0
2022-09-13 15:48:45,189 INFO mapred.LocalJobRunner: reduce task executor complete.
2022-09-13 15:48:45,983 INFO mapreduce.Job: Job job_local1846116712_0001 running in uber mode : false
2022-09-13 15:48:45,985 INFO mapreduce.Job:  map 100% reduce 100%
2022-09-13 15:48:45,985 INFO mapreduce.Job: Job job_local1846116712_0001 completed successfully
2022-09-13 15:48:45,990 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=5254
		FILE: Number of bytes written=1032595
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=19
		Map output records=102
		Map output bytes=1511
		Map output materialized bytes=1721
		Input split bytes=122
		Combine input records=0
		Combine output records=0
		Reduce input groups=30
		Reduce shuffle bytes=1721
		Reduce input records=102
		Reduce output records=30
		Spilled Records=204
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=0
		Total committed heap usage (bytes)=510656512
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=712
	File Output Format Counters 
		Bytes Written=316
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160

2)、运行结果

  • 输入文件内容
    在这里插入图片描述

  • 输出文件目录
    在这里插入图片描述

  • 输出文件内容
    在这里插入图片描述
    以上可以看出完成了一个文件的单词统计工作。

三、运行环境介绍

本节将介绍MR的运行模式,即是本地模式运行还是yarn运行。
运行在何种模式 取决于参数:mapreduce.framework.name

  • yarn:YARN集群模式
  • local:本地模式
    如果不指定,默认是local模式。
    在mapred-default.xml中定义。如果代码中(conf.set)、运行的环境中有配置(mapred-site.xml),会默认覆盖default配置。

通过yarn运行的MR可以在http://resourcemanager_host:8088中查看到任务的运行情况。示例如下图:
在这里插入图片描述

1、yarn运行模式

MapReduce程序提交给yarn集群,分发到多个节点上分布式并发执行。数据通常位于HDFS。
需要配置参数:(不同的环境配置hostname可能不同)

mapreduce.framework.name=yarn
yarn.resourcemanager.hostname=server1
  • 1
  • 2

提交集群的实现步骤

  • 确保Hadoop集群启动(HDFS集群、YARN集群)
  • 将程序打成jar包,上传jar到Hadoop集群的任意一个节点
  • 执行命令启动

以下为上述的wordcount在yarn集群中运行示例

1)、在pom.xml文件下用mvn打包

一般在项目工程目录下,pom.xml文件所在的目录,通过命令行运行下述命令。
前提是本机已经装好了maven的运行环境,并且对maven的命令有一定的了解。

mvn package clean -Dmaven.test.skip=true

mvn package  -Dmaven.test.skip=true
  • 1
  • 2
  • 3

2)、上传打包好的jar文件

将打包的文件hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar上传至集群机器所在的目录下/usr/local/bigdata/hadoop-3.1.4/testMR
文件名可以随便改,只要扩展名是jar即可,本文没有修改。

3)、执行mr程序

找到上传的jar文件目录,本示例是直接到/usr/local/bigdata/hadoop-3.1.4/testMR目录下
关于java执行jar命令,如果不熟悉的则查看相关的文章。不管是hadoop或yran运行mr和java运行jar的命令差不多,即 命令(hadoop/yarn/java) jar jar文件位置 java main运行类 参数列表
本文是将上述的两种wordcount示例写法都运行了一遍,但运行日志和结果只展示了一个。

hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WC /mr/1.txt /mr/out
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCountByTool /mr/1.txt /mr/out
  • 1
  • 2

具体如下


3、执行mr程序
3.1、这里的参数路径需要是hdfs上的路径
3.2、源码中固定参数改为通过args传递参数,第一个参数是输入路径,第二个参数是输出路径
yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCount /mr/1.txt /mr/out
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCount /mr/1.txt /mr/out


yarn jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCountByTool /mr/1.txt /mr/out
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCountByTool /mr/1.txt /mr/out


[alanchan@server4 testMR]$ hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.hadoop.mr.WordCount /mr/1.txt /mr/out
2022-09-13 08:55:55,302 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2022-09-13 08:55:55,348 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/alanchan/.staging/job_1663059265921_0001
2022-09-13 08:56:01,036 INFO input.FileInputFormat: Total input files to process : 1
2022-09-13 08:56:01,322 INFO mapreduce.JobSubmitter: number of splits:1
2022-09-13 08:56:01,551 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1663059265921_0001
2022-09-13 08:56:01,552 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-09-13 08:56:01,700 INFO conf.Configuration: resource-types.xml not found
2022-09-13 08:56:01,700 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-09-13 08:56:02,074 INFO impl.YarnClientImpl: Submitted application application_1663059265921_0001
2022-09-13 08:56:02,105 INFO mapreduce.Job: The url to track the job: http://server1:8088/proxy/application_1663059265921_0001/
2022-09-13 08:56:02,106 INFO mapreduce.Job: Running job: job_1663059265921_0001
2022-09-13 08:56:11,186 INFO mapreduce.Job: Job job_1663059265921_0001 running in uber mode : false
2022-09-13 08:56:11,187 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-13 08:56:17,228 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-13 08:56:24,260 INFO mapreduce.Job:  map 100% reduce 100%
2022-09-13 08:56:24,265 INFO mapreduce.Job: Job job_1663059265921_0001 completed successfully
2022-09-13 08:56:24,346 INFO mapreduce.Job: Counters: 53
        File System Counters
                FILE: Number of bytes read=1537
                FILE: Number of bytes written=454571
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=732
                HDFS: Number of bytes written=292
                HDFS: Number of read operations=8
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=3748
                Total time spent by all reduces in occupied slots (ms)=4696
                Total time spent by all map tasks (ms)=3748
                Total time spent by all reduce tasks (ms)=4696
                Total vcore-milliseconds taken by all map tasks=3748
                Total vcore-milliseconds taken by all reduce tasks=4696
                Total megabyte-milliseconds taken by all map tasks=3837952
                Total megabyte-milliseconds taken by all reduce tasks=4808704
        Map-Reduce Framework
                Map input records=17
                Map output records=91
                Map output bytes=1349
                Map output materialized bytes=1537
                Input split bytes=96
                Combine input records=0
                Combine output records=0
                Reduce input groups=29
                Reduce shuffle bytes=1537
                Reduce input records=91
                Reduce output records=29
                Spilled Records=182
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=75
                CPU time spent (ms)=1230
                Physical memory (bytes) snapshot=524668928
                Virtual memory (bytes) snapshot=5574508544
                Total committed heap usage (bytes)=406323200
                Peak Map Physical memory (bytes)=308199424
                Peak Map Virtual memory (bytes)=2775080960
                Peak Reduce Physical memory (bytes)=216469504
                Peak Reduce Virtual memory (bytes)=2799427584
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=636
        File Output Format Counters 
                Bytes Written=292
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89

4)、查看结果

正常的在hdfs上查看文件内容即可。

4、查看结果

[alanchan@server4 testMR]$ hadoop fs -ls /mr
Found 2 items
-rw-r--r--   3 alanchan supergroup        636 2022-09-13 08:12 /mr/1.txt
drwxr-xr-x   - alanchan supergroup          0 2022-09-13 08:56 /mr/out
[alanchan@server4 testMR]$ hadoop fs -ls /mr/out
Found 2 items
-rw-r--r--   3 alanchan supergroup          0 2022-09-13 08:56 /mr/out/_SUCCESS
-rw-r--r--   3 alanchan supergroup        292 2022-09-13 08:56 /mr/out/part-r-00000
[alanchan@server4 testMR]$ hadoop fs -cat /mr/out/part-r-00000
        2
Configuration;  1
Configured;     1
FileInputFormat;        1
FileOutputFormat;       1
FileSystem;     1
Job;    1
LongWritable;   2
Path;   1
Reducer;        1
Text;   2
Tool;   1
ToolRunner;     1
apache  13
cn      1
conf    2
fs      2
hadoop  14
import  13
input   1
io      3
itcast  1
lib     2
mapreduce       5
org     13
output  1
package 1
util    2
wordcount;      1
[alanchan@server4 testMR]$ 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

2、local运行模式

MapReduce程序是被提交给LocalJobRunner在本地以单进程的形式运行。是单机程序。
输入和输出的数据可以在本地文件系统,也可以在HDFS上。
右键直接运行main方法所在的主类即可。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130431900"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top