首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

11、hadoop环境下的Sequence File的读写与合并Hadoop系列文章目录一、Sequence File的读写二、小文件合并

  • 23-09-04 16:01
  • 4069
  • 11651
blog.csdn.net

Hadoop系列文章目录

1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化


文章目录

  • Hadoop系列文章目录
  • 一、Sequence File的读写
    • 1、Sequence File的格式
      • 1)、未压缩格式
      • 2)、基于record压缩格式
      • 3)、基于block压缩格式
    • 2、Sequence File文件读写
      • 1)、pom.xml
      • 2)、实现类
  • 二、小文件合并


本文介绍hadoop环境下的Sequence File的读写与合并。
本文依赖:hadoop环境可用,本示例是以hadoop的HA环境作为示例的,如果不是HA环境,参考本专栏的hdfs文件的常规操作。

一、Sequence File的读写

Sequence File文件介绍参考本专栏的hdfs的文件存储格式及压缩算法中的介绍,如果需要更多的信息则需要自行搜索其他的资源。

1、Sequence File的格式

根据压缩类型,有3种不同的Sequence File格式:未压缩格式、record压缩格式、block压缩格式。

Sequence File由一个header和一个或多个record组成。

以上三种格式均使用相同的header结构,如下所示:
在这里插入图片描述

前3个字节为SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如SEQ4或SEQ6)。

Header中其他也包括key、value class名字、 压缩细节、metadata、Sync marker。
Sync Marker同步标记,用于可以读取任意位置的数据。

1)、未压缩格式

未压缩的Sequence File文件由header、record、sync三个部分组成。

其中record包含了4个部分:record length(记录长度)、key length(键长)、key、value。

每隔几个record(100字节左右)就有一个同步标记
在这里插入图片描述

2)、基于record压缩格式

基于record压缩的Sequence File文件由header、record、sync三个部分组成。

其中record包含了4个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)。

每隔几个record(100字节左右)就有一个同步标记。
在这里插入图片描述

3)、基于block压缩格式

基于block压缩的Sequence File文件由header、block、sync三个部分组成。

block指的是record block,可以理解为多个record记录组成的块。这个block和HDFS中分块存储的block(128M)是不同的概念。

Block中包括:record条数、压缩的key长度、压缩的keys、压缩的value长度、压缩的values。每隔一个block就有一个同步标记。

block压缩比record压缩提供更好的压缩率。使用Sequence File时,通常首选块压缩。
在这里插入图片描述

2、Sequence File文件读写

1)、pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-commonartifactId>
        <version>3.1.4version>
    dependency>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-hdfsartifactId>
        <version>3.1.4version>
    dependency>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>3.1.4version>
    dependency>
dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2)、实现类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileRW {
	private static final String[] DATA = { 
                   "One, two, buckle my shoe", 
                   "Three, four, shut the door",
			       "Five, six, pick up sticks", 
                   "Seven, eight, lay them straight", 
                   "Nine, ten, a big fat hen" 
                   };

	public static void main(String[] args) throws Exception {
		// 设置客户端运行身份 以root去操作访问HDFS
		System.setProperty("HADOOP_USER_NAME", "alanchan");
		// Configuration 用于指定相关参数属性
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
		conf.set("dfs.nameservices", "HadoopHAcluster");
		conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
		conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
		conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
		conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

		Path path = new Path("/testseq/test.seq");
		write(conf, path);
		read(conf, path);
	}

	public static void write(Configuration conf, Path path) throws Exception {
		// sequence file key、value
		IntWritable key = new IntWritable();
		Text value = new Text();
		// 构造Writer参数属性
		SequenceFile.Writer writer = null;
		CompressionCodec Codec = new GzipCodec();
		SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
		SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
		SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
		SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, Codec);
		try {
			writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom);
			for (int i = 0; i < 100; i++) {
				key.set(100 - i);
				value.set(DATA[i % DATA.length]);
				System.out.printf("[%s]	%s	%s", writer.getLength(), key, value);
				writer.append(key, value);
			}
			
		} finally {
			IOUtils.closeStream(writer);
		}
		System.out.println("写完了");
	}

	public static void read(Configuration conf, Path path) throws Exception {
		SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(path);
		SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(374);// 这个374参数表示读取的长度
		SequenceFile.Reader reader = null;
		try {
			reader = new SequenceFile.Reader(conf, option1, option2);
			Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
			Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
			long position = reader.getPosition();
			while (reader.next(key, value)) {
				String syncSeen = reader.syncSeen() ? "*" : "";// 是否返回了Sync Mark同步标记
				System.out.printf("[%s%s]	%s	%s
", position, syncSeen, key, value);
				position = reader.getPosition(); // beginning of next record
			}
		} finally {
			IOUtils.closeStream(reader);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

二、小文件合并

将所有的小文件写入到一个Sequence File中,即将文件名作为key,文件内容作为value序列化到Sequence File大文件中

import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;

public class SmallFilesMergeBySequenceFile {

	private List<String> smallFilePaths = new ArrayList<String>();

	public void addInputPath(String path) throws Exception {
		File file = new File(path);
		if (file.isDirectory()) {
			File[] files = FileUtil.listFiles(file);
			for (File sFile : files) {
				smallFilePaths.add(sFile.getPath());
				System.out.println("添加小文件路径:" + sFile.getPath());
			}
		} else {
			smallFilePaths.add(file.getPath());
			System.out.println("添加小文件路径:" + file.getPath());
		}
	}

	public void mergeFile(Configuration configuration, Path path) throws Exception {
		Writer.Option bigFile = Writer.file(path);
		Writer.Option keyClass = Writer.keyClass(Text.class);
		Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
		Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);

		Text key = new Text();
		for (String sfps : smallFilePaths) {
			File file = new File(sfps);
			long fileSize = file.length();
			byte[] fileContent = new byte[(int) fileSize];
			FileInputStream inputStream = new FileInputStream(file);
			inputStream.read(fileContent, 0, (int) fileSize);
			String md5Str = DigestUtils.md5Hex(fileContent);
			System.out.println("merge小文件:" + sfps + ",md5:" + md5Str);
			key.set(sfps);
			// 把文件路径作为key,文件内容做为value,放入到sequencefile中
			writer.append(key, new BytesWritable(fileContent));
		}
		writer.hflush();
		writer.close();
	}

	public void readMergedFile(Configuration configuration, Path path) throws Exception {
		Reader.Option file = Reader.file(path);
		Reader reader = new Reader(configuration, file);
		Text key = new Text();
		BytesWritable value = new BytesWritable();
		while (reader.next(key, value)) {
			byte[] bytes = value.copyBytes();
			String md5 = DigestUtils.md5Hex(bytes);
			String content = new String(bytes, Charset.forName("GBK"));
			System.out.println("读取到文件:" + key + ",md5:" + md5 + ",content:" + content);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();

		SmallFilesMergeBySequenceFile msf = new SmallFilesMergeBySequenceFile();

		Path path = new Path("");
		msf.addInputPath("");//
		msf.mergeFile(configuration, path);
		msf.readMergedFile(configuration, path);
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130359237"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top