首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)Hadoop系列文章目录一、pom.xml二、junit测试类三、操作类(帮助类)四、高可用环境的操作类

  • 23-09-04 16:00
  • 2038
  • 12373
blog.csdn.net

Hadoop系列文章目录

1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化


目录

  • Hadoop系列文章目录
  • 一、pom.xml
  • 二、junit测试类
  • 三、操作类(帮助类)
  • 四、高可用环境的操作类


本文编写了java对HDFS的常见操作,并且均测试通过。其功能包含构造conf、设置系统环境变量、创建目录、判断文件是否存在、获取文件/目录的大小等,具体见下图。
在这里插入图片描述

本文分为四部分,即pom.xml、junit测试类、操作类、高可用环境操作类。

  • pom.xml描述本代码运行需要哪些maven依赖
  • junit测试类是对操作类的单元测试
  • 操作类就是对hdfs api的使用
  • 高可用环境操作类就是操作类在高可用环境中的配置

一、pom.xml


<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0modelVersion>
	<parent>
		<groupId>com.okcardgroupId>
		<artifactId>bigdata-componentartifactId>
		<version>0.0.1-SNAPSHOTversion>
	parent>
	<groupId>com.okcardgroupId>
	<artifactId>hadoopartifactId>
	<version>0.0.1-SNAPSHOTversion>
	<name>hadoopname>
	<url>http://maven.apache.orgurl>
	<properties>
		<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
	properties>
	
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-commonartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-clientartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-hdfsartifactId>
			<version>3.1.4version>
		dependency>
  		<dependency>
			<groupId>junitgroupId>
			<artifactId>junitartifactId>
			<version>4.13version>
		dependency>
		<dependency>
			<groupId>org.projectlombokgroupId>
			<artifactId>lombokartifactId>
			<version>1.18.22version>
		dependency>
	dependencies>
project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

二、junit测试类

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author alanchan HDFSUtil测试用例
 */
public class HDFSUtilTest {
	private HDFSUtil hdfsUtil;

	@Before
	public void setUp() throws Exception {
		hdfsUtil = HDFSUtil.getHDFSUtilInstance();
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
	}

//	assertTrue(String message, boolean condition) 
//	assertFalse(String message, boolean condition)
//	assertNotEquals(String message, Object unexpected,Object actual)
//	assertArrayEquals(String message, Object[] expecteds,Object[] actuals)
//	assertNotNull(String message, Object object)
//	assertNull(String message, Object object)
//	assertSame(String message, Object expected, Object actual)
	@Test
	public void testMkdir() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Test
	public void testExists() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testGetHdfsFileSize() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			hdfsUtil.writeHdfsFile(path, fileContent);
			long fileSize = hdfsUtil.getHdfsFileSize(path);
//			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertEquals("文件大小与期望一致", fileContent.length(), fileSize);
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteHdfsFile2() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(new Path(path), fileContent.getBytes()));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteUTFHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testReadHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			assertEquals("读取文件内容与预期一致", fileContent, hdfsUtil.readHdfsFile(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testReadUTFHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
			assertEquals("读取UTF文件内容与预期一致", fileContent, hdfsUtil.readUTFHdfsFile(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteAndAppendHdfsFile() {
		try {
			// 構造conf
			Configuration conf = new Configuration();
			conf.set("fs.defaultFS", "hdfs://server1:8020");
			conf.setBoolean("dfs.support.append", true);
			conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
			conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
			conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
			// 調用示例
			hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
			String path = "/test/testsub";
			String fileContent = "this is a testing";

			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			fileContent = "
 line this is another testing";
			assertTrue("追加写文件成功", hdfsUtil.writeAndAppendHdfsFile(path, fileContent));

			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWalkHdfsDir() {
		try {
			String dir = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			hdfsUtil.walkHdfsDir(new Path("/test"));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWalkHdfsDir_Iter() {
		try {
			String dir = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			hdfsUtil.walkHdfsDir_Iter(new Path("/test"), true);
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testUploadLocalFileToHdfs() {
		try {
			String localFile = "d://test.txt";
			BufferedWriter out = new BufferedWriter(new FileWriter(localFile));
			out.write("this is test uplaod file testing");
			out.close();

//			File tempFile = new File(localFile);
//			tempFile.deleteOnExit();
//			tempFile.delete();

			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			assertTrue("上传本地文件成功", hdfsUtil.uploadLocalFileToHdfs(localFile, path));

			assertTrue("刪除本地临时文件成功", new File(localFile).delete());
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testDownloadHdfsToLocal() {
		try {
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			String localFile = "d://testdown.txt";
			assertTrue("从HDFS上下载文件到本地成功", hdfsUtil.downloadHdfsToLocal(path, localFile));

			BufferedReader in = new BufferedReader(new FileReader(localFile));
			StringBuilder localFileContent = new StringBuilder();
			String line;
			while ((line = in.readLine()) != null) {
				localFileContent.append(line).append("
");
			}
			localFileContent.deleteCharAt(localFileContent.length() - 1);

			in.close();
			assertEquals("下文件内容与hdfs文件内容一致", localFileContent.toString(), fileContent);
			assertTrue("刪除本地临时文件成功", new File(localFile).delete());
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testRename() {
		try {
			String path = "/test/test/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			String destPath = "/t/test.log";
			assertTrue("rename成功", hdfsUtil.rename(path, destPath, true));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/t", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testDeleteHdfs() {
		try {
			String path = "/test/test/testsub.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFile() {
		try {
			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test/test/dest.txt";
			assertTrue("複製文件成功", hdfsUtil.copyFile(srcPath, destpath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFile2() {
		try {
			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test/test/dest.txt";
			assertTrue("複製文件成功", hdfsUtil.copyFile2(srcPath, destpath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyDir() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test1";
			assertTrue("複製文件目錄成功", hdfsUtil.copyDir(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test1", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFilesExcludeDir() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test2";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
			assertTrue("複製文件目錄成功", hdfsUtil.copyFilesExcludeDir(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test2", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyDirOnlyFiles() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test3";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
			assertTrue("複製文件目錄成功", hdfsUtil.copyDirOnlyFiles(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test3", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testSearchFile() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing
 line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String searchFileName = "src2.txt";
			assertEquals("查找文件数量与预期一致", 1, hdfsUtil.searchFile(dir, searchFileName, true).size());

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFromLocalFileWithProgress() {
		try {
			String destPath = "/test/test/t.rat";
			String localBigFile = "D:\BaiduNetdiskDownload\test.rar";
			assertTrue("查找文件数量与预期一致", hdfsUtil.copyFromLocalFileWithProgress(localBigFile, destPath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testMergeFile() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/src2.txt";
			fileContent = "this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destPath = "/test/src3.txt";
			assertTrue("文件合并成功", hdfsUtil.mergeFile(dir, destPath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs(destPath, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testTest() {
		hdfsUtil.test();
		hdfsUtil.test("123");
		hdfsUtil.test("123", "456");
		hdfsUtil.test("");
//		test();
//		test("test");
//		test("123", "321");
		assertEquals("Regular multiplication should work", "", "");
	}

	@After
	public void close() {
		hdfsUtil.close();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456

三、操作类(帮助类)

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import lombok.extern.slf4j.Slf4j;

/**
 * @author alanchan hdfs常用帮助类
 */
@Slf4j
public class HDFSUtil {
	private static Configuration conf = null;
	private static FileSystem fileSystem = null;
	private static final String HADOOP_USER_NAME = "alanchan";
	private static final String DEFAULTFS = "hdfs://server1:8020";
	private static HDFSUtil hdfsInstance = null;

	/**
	 * 
	 * @param strings 默认第一个参数是defaultFS,可以传更多参数,如果传入2个参数,则第二个默认为user,其他参数暂没用
	 * @throws Exception
	 */
	private HDFSUtil(String... strings) throws Exception {
		// 设置客户端身份 以具备权限在hdfs上进行操作
		if (strings.length >= 2) {
			System.setProperty("HADOOP_USER_NAME", strings[1]);
		} else {
			System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
		}
		conf = new Configuration();
		// 设置操作的文件系统是HDFS 并且指定HDFS操作地址。第一个参数是defaultFS
		if (strings.length >= 1) {
			conf.set("fs.defaultFS", strings[0]);
		} else {
			conf.set("fs.defaultFS", DEFAULTFS);
		}
		fileSystem = FileSystem.get(conf);
	}

	private HDFSUtil(Configuration conf, String... strings) throws Exception {
		// 设置客户端身份 以具备权限在hdfs上进行操作
		if (strings.length >= 1) {
			System.setProperty("HADOOP_USER_NAME", strings[0]);
		} else {
			System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
		}

		this.conf = conf;
		fileSystem = FileSystem.get(this.conf);
	}

	public static HDFSUtil getHDFSUtilInstance(Configuration conf, String... strings) throws Exception {
		if (hdfsInstance == null) {
			hdfsInstance = new HDFSUtil(conf, strings);
		}
		return hdfsInstance;
	}

	public static HDFSUtil getHDFSUtilInstance(String... strings) throws Exception {
		if (hdfsInstance == null) {
			hdfsInstance = new HDFSUtil(strings);
		}
		return hdfsInstance;
	}

	public boolean mkdir(String path) throws Exception {
		return mkdir(new Path(path));
	}

	public boolean mkdir(Path p) throws Exception {
		if (!fileSystem.exists(p)) {
			fileSystem.mkdirs(p);
		}
		return fileSystem.exists(p);
	}

	public boolean exists(String path) throws Exception {
		return exists(new Path(path));
	}

	public boolean exists(Path p) throws Exception {
		return fileSystem.exists(p);
	}

	public long getHdfsFileSize(String path) throws Exception {
		return getHdfsFileSize(new Path(path));
	}

	public long getHdfsFileSize(Path path) throws Exception {
		return fileSystem.getContentSummary(path).getLength();
	}

	/**
	 * 
	 * @param path        写入文件的路径,含文件名称
	 * @param fileContent 写入文件的内容
	 * @return 如果文件存在,且文件大小大于0,则视为写入成功
	 * @throws Exception
	 */
	public boolean writeHdfsFile(String path, String fileContent) throws Exception {
		return writeHdfsFile(new Path(path), fileContent);
	}

	public boolean writeHdfsFile(Path path, byte[] fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.write(fileContent);
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	public boolean writeHdfsFile(Path path, String fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.write(fileContent.getBytes());
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	/**
	 * Writes a string to the underlying output stream usingmodified UTF-8 encoding
	 * in a machine-independent manner.
	 * 
	 * @param path
	 * @param fileContent
	 * @return
	 * @throws Exception
	 */
	public boolean writeUTFHdfsFile(Path path, String fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.writeUTF(fileContent);
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	public boolean writeUTFHdfsFile(String path, String fileContent) throws Exception {
		return writeUTFHdfsFile(new Path(path), fileContent);
	}

	public String readHdfsFile(String path) throws Exception {
		return readHdfsFile(new Path(path));
	}

	public String readHdfsFile(Path path) throws Exception {
		FSDataInputStream out = fileSystem.open(path);
		BufferedReader br = new BufferedReader(new InputStreamReader(out));
		String line;
		StringBuilder result = new StringBuilder();
		while ((line = br.readLine()) != null) {
			result.append(line).append("
");
		}
		// 去掉最后一个换行符
		result.deleteCharAt(result.length() - 1);
		br.close();
		out.close();
		return result.toString();
	}

	public String readUTFHdfsFile(String path) throws Exception {
		return readUTFHdfsFile(new Path(path));
	}

	/**
	 * 好像只能读取通过writeUTF写入的文件
	 * 
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public String readUTFHdfsFile(Path path) throws Exception {
		FSDataInputStream out = fileSystem.open(path);
		String fileContent = out.readUTF();
		out.close();
		return fileContent;
	}

	// 構造conf
//	Configuration conf = new Configuration();
//	conf.set("fs.defaultFS", DEFAULTFS);
//	conf.setBoolean("dfs.support.append", true);
//	conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
//	conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
//	conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
	// 調用示例
//	HDFSUtil.getHDFSUtilInstance(conf).writeAndAppendHdfsFile(new Path(path1), path2);
	/**
	 * 该例的前提是文件已经存在,仅仅是演示追加fileContent内容至目标文件中
	 * 
	 * @param destPath
	 * @param fileContent
	 * @return
	 * @throws Exception
	 */
	public boolean writeAndAppendHdfsFile(Path destPath, String fileContent) throws Exception {
		fileSystem = destPath.getFileSystem(conf);

		if (!fileSystem.exists(destPath)) {
			fileSystem.createNewFile(destPath);
		}

		FSDataOutputStream output = fileSystem.append(destPath);
//		FSDataOutputStreamBuilder output = fileSystem.appendFile(destPath);
		output.write(fileContent.getBytes("UTF-8"));
		// 文件最后多一个换行符
		output.write("
".getBytes("UTF-8"));
//		fileSystem.close();
		output.close();

		return fileSystem.exists(destPath) && getHdfsFileSize(destPath) > 0;
	}

	public boolean writeAndAppendHdfsFile(String destPath, String fileContent) throws Exception {
		return writeAndAppendHdfsFile(new Path(destPath), fileContent);
	}

	public void walkHdfsDir(String path) throws Exception {
		walkHdfsDir(new Path(path));
	}

	public void walkHdfsDir(Path path) throws Exception {
		FileStatus[] fileStatuses = fileSystem.listStatus(path);
		for (FileStatus fileStatus : fileStatuses) {
			if (fileStatus.isDirectory()) {
				walkHdfsDir(fileStatus.getPath());
				log.info("目录 = {}", fileStatus.getPath());
			} else {
				log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
			}
		}

	}

	public void walkHdfsDir_Iter(String path, boolean recursive) throws Exception {
		walkHdfsDir_Iter(new Path(path), recursive);
	}

	/**
	 * 
	 * @param path
	 * @param recursive 是否递归
	 * @throws Exception
	 */
	public void walkHdfsDir_Iter(Path path, boolean recursive) throws Exception {
		RemoteIterator<LocatedFileStatus> lfsRemoteIterator = fileSystem.listFiles(path, recursive);
		while (lfsRemoteIterator.hasNext()) {
			LocatedFileStatus next = lfsRemoteIterator.next();
			log.info("文件完整路径 = {},文件名={}", next.getPath(), next.getPath().getName());
		}
	}

	public boolean uploadLocalFileToHdfs(String localFile, String dest) throws Exception {
		return uploadLocalFileToHdfs(localFile, new Path(dest));
	}

	/**
	 * 
	 * @param localFile
	 * @param dest
	 * @return 简单比较目标和源文件的大小判断是否上传成功
	 * @throws Exception
	 */
	public boolean uploadLocalFileToHdfs(String localFile, Path dest) throws Exception {
		fileSystem.copyFromLocalFile(new Path(localFile), dest);
		File file = new File(localFile);
		return file.length() == getHdfsFileSize(dest);
	}

	public boolean downloadHdfsToLocal(String hdfsFile, String localFile) throws Exception {
		return downloadHdfsToLocal(new Path(hdfsFile), localFile);
	}

	public boolean downloadHdfsToLocal(Path hdfsFile, String localFile) throws Exception {
		fileSystem.copyToLocalFile(hdfsFile, new Path(localFile));
		File file = new File(localFile);
		return file.length() == getHdfsFileSize(hdfsFile);
	}

	/**
	 * 
	 * @param srcPath
	 * @param destPath
	 * @param mkdirFlag 如果目标目录不存在是否创建
	 * @return
	 * @throws Exception
	 */
	public boolean rename(Path srcPath, Path destPath, boolean mkdirFlag) throws Exception {
		boolean flag = false;
		if (fileSystem.exists(srcPath)) {
			if (mkdirFlag && !fileSystem.exists(destPath)) {
				fileSystem.mkdirs(destPath);
			}
			flag = fileSystem.rename(srcPath, destPath);
		}

		return flag;
	}

	public boolean rename(String srcPath, String destPath, boolean mkdirFlag) throws Exception {
		return rename(new Path(srcPath), new Path(destPath), mkdirFlag);
	}

	/**
	 * 删除文件或目录
	 * 
	 * @param dest
	 * @param recursive 是否递归删除
	 * @return
	 * @throws Exception
	 */
	public boolean deleteHdfs(Path dest, boolean recursive) throws Exception {
		return fileSystem.delete(dest, recursive);
	}

	public boolean deleteHdfs(String dest, boolean recursive) throws Exception {
		return deleteHdfs(new Path(dest), recursive);
	}

	/**
	 * 拷贝单个文件或流?
	 * 
	 * @param srcPath
	 * @param destpath
	 * @throws Exception
	 */
	public void copyFile(Path srcPath, Path destpath) throws Exception {
		FSDataInputStream in = fileSystem.open(srcPath);
		FSDataOutputStream out = fileSystem.create(destpath);
		IOUtils.copyBytes(in, out, conf);

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
	}

	public boolean copyFile(String srcPath, String destpath) throws Exception {
		copyFile(new Path(srcPath), new Path(destpath));
		return true;
	}

	public boolean copyFile2(String srcPath, String destpath) throws Exception {
		copyFile2(new Path(srcPath), new Path(destpath));
		return true;
	}

	public void copyFile2(Path srcPath, Path destpath) throws Exception {
		FSDataInputStream in = fileSystem.open(srcPath);
		FSDataOutputStream out = fileSystem.create(destpath);

		byte[] b = new byte[1024];
		int hasRead = 0;
		while ((hasRead = in.read(b)) > 0) {
			out.write(b, 0, hasRead);
		}

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
	}

	public boolean copyDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyDir(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 文件夹拷贝,包含文件夾
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @return
	 * @throws Exception
	 */
	public boolean copyDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
//		  public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
		return FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, deleteSource, conf);
	}

	/**
	 * 拷貝文件及目錄,但不包含srcPath的第一層目錄,即srcPath是一个目录,但只拷贝srcPath最后一层目录下的所有文件及目录,但不包含本身
	 * 例如:srcPath=/copyDir/src,destPath=/copyDir/dest,最终结果是将/copyDir/src下面的所有文件及目录拷贝至/copyDir/dest下,但不包含src这个文件夹
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @throws Exception
	 */
	public boolean copyFilesExcludeDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
		boolean flag = false;
		FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
		for (FileStatus fileStatus : fileStatuses) {
			if (fileStatus.isDirectory()) {
//				copyFilesExcludeDir(fileStatus.getPath(), destpath, deleteSource);
				flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
			} else {
				flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
			}
		}
		return flag;
	}

	public boolean copyFilesExcludeDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyFilesExcludeDir(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 拷贝源文件夹下的所有文件到目标文件夹,不含源文件夹下的文件夹
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @return
	 * @throws Exception
	 */
	public boolean copyDirOnlyFiles(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
		boolean flag = false;
		RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
		while (sourceFiles.hasNext()) {
			flag = FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, deleteSource, conf);
		}
		return flag;
	}

	public boolean copyDirOnlyFiles(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyDirOnlyFiles(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 查找文件,精确查找 equals
	 * 
	 * @param srcPath
	 * @param searchFileName
	 * @param recursive
	 * @return
	 * @throws Exception
	 */
	public List<Path> searchFile(Path srcPath, String searchFileName, boolean recursive) throws Exception {
		List<Path> list = new ArrayList<Path>();

		RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, recursive);
		while (sourceFiles.hasNext()) {
			Path file = sourceFiles.next().getPath();
			String srcFileName = file.getName();
			if (searchFileName.equals(srcFileName)) {
				list.add(file);
			}
		}
		return list;
	}

	public List<Path> searchFile(String srcPath, String searchFileName, boolean recursive) throws Exception {
		return searchFile(new Path(srcPath), searchFileName, recursive);
	}

	/**
	 * 通過流的方式,带进度条的本地文件拷贝至hdfs
	 * 
	 * @param fs
	 * @throws Exception
	 */
	public boolean copyFromLocalFileWithProgress(File srcPath, Path destPath) throws Exception {
		InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
		final float fileSize = srcPath.length() / 65536;
		FSDataOutputStream out = fileSystem.create(destPath, new Progressable() {
			long fileCount = 0;

			@Override
			// progress 方法在每次上传了64K(64*1024=65536)B字节大小的文件之后会自动调用一次
			// 通过累加之后的结果与整个文件大小来计算总上传文件的进度
			public void progress() {
				fileCount++;
				System.out.println("进度%:" + (fileCount / fileSize) * 100 + " %");
			}

		});
		IOUtils.copyBytes(in, out, 4096);

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
		return getHdfsFileSize(destPath) == srcPath.length();
	}

	public boolean copyFromLocalFileWithProgress(String srcPath, String destPath) throws Exception {
		return copyFromLocalFileWithProgress(new File(srcPath), new Path(destPath));
	}

	/**
	 * 合并文件
	 * 
	 * @param srcPath  目錄
	 * @param destPath 文件名,含路徑
	 * @return
	 * @throws Exception
	 */
	public boolean mergeFile(Path srcPath, Path destPath) throws Exception {
		FSDataOutputStream fout = fileSystem.create(destPath);
		if (fileSystem.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lsfr = fileSystem.listFiles(srcPath, true);
			while (lsfr.hasNext()) {
				LocatedFileStatus next = lsfr.next();
				FSDataInputStream fin = fileSystem.open(next.getPath());
				IOUtils.copyBytes(fin, fout, 4096);
				fin.close();
			}
		}
		fout.close();
		return getHdfsFileSize(srcPath) == getHdfsFileSize(srcPath);
	}

	public boolean mergeFile(String srcPath, String destPath) throws Exception {
		return mergeFile(new Path(srcPath), new Path(destPath));
	}

	public void close() {
		if (fileSystem != null) {
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		String path1 = "/test/test.rar";
		String path2 = "This is a testing";
		String searchFileName = "D:\BaiduNetdiskDownload\2test.rar";

		try {

//			HDFSUtil.getHDFSUtilInstance().writeUTFHdfsFile(path1, path2);copyDir  copyFilesExcludeDir
			String srcPath = "/testhdfs_copyDir/src", destPath = "/testhdfs_copyDir/dest";
			System.out.println(HDFSUtil.getHDFSUtilInstance().copyFilesExcludeDir(srcPath, destPath, false));

		} catch (Exception e) {
			e.printStackTrace();
		}

//		test();
//		test("test");
//		test("123", "321");

	}

	static void test(String... strings) {
		if (strings.length == 1) {
			System.out.println("testing=" + strings[0]);
		} else {
			System.out.println("testing=" + strings.length);
		}
	}

	/**
	 * 没有验证,仅示例加载配置文件
	 * 
	 * @param srcPath
	 * @param destPath
	 * @throws Exception
	 */
	private void copyOrMoveFiles(Path srcPath, Path destPath) throws Exception {
		Configuration conf = new Configuration();
		conf.addResource("basedir/hdfs-site.xml");
		conf.addResource("basedir/core-site.xml");
//            FileSystem fs = FileSystem.newInstance(conf);
		FileContext fc = FileContext.getFileContext(conf);
		FileContext.Util util = fc.util();
		util.copy(srcPath, destPath, false, false);
	}

	/**
	 * 遍历目录,IOUtils.copyBytes(),顺序复制,通过获取本地文件系统或者直接new FileInputStream()的方式实现 2种
	 * 沒有驗證,主要參考其流、conf的應用
	 * 
	 * @param srcPath
	 * @param destPath
	 * @throws Exception
	 */
	private void upload(Path srcPath, Path destPath) throws Exception {
		// 获取本地文件系统
		LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
		if (localFs.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lsfr = localFs.listFiles(srcPath, true);
			while (lsfr.hasNext()) {
				LocatedFileStatus next = lsfr.next();
				FSDataInputStream fin = fileSystem.open(next.getPath());
				FSDataOutputStream fout = fileSystem.create(new Path("/test/data/" + next.getPath().getName()));
				IOUtils.copyBytes(fin, fout, 4096);
				fin.close();
				fout.close();
			}
		} else {
			FSDataInputStream fin = fileSystem.open(srcPath);
//			InputStream fin = new FileInputStream("D://a.txt");
			FSDataOutputStream fout = fileSystem.create(destPath);
			IOUtils.copyBytes(fin, fout, 4096);
			fin.close();
			fout.close();
		}

	}

	private void upload(String srcPath, String destPath) throws Exception {
		upload(new Path(srcPath), new Path(destPath));
	}

	/**
	 * 沒有驗證,主要參考其流、conf的應用
	 * 
	 * @param srcPath
	 * @param destPath
	 * @param fs
	 * @param local
	 * @param fc
	 * @throws Exception
	 */
	private void downLoad(Path srcPath, Path destPath, FileSystem fs, FileSystem local, FileContext fc)
			throws Exception {
// 2.字节流的复制
		FileOutputStream localFileOut = new FileOutputStream("D:\test\a.txt");
		FSDataInputStream remoteFin = fs.open(srcPath);

		IOUtils.copyBytes(remoteFin, localFileOut, 2048);
		remoteFin.close();
		localFileOut.close();

//4.下载前做合并
		LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
		FSDataOutputStream fsdOut = localFs.create(destPath);
		if (fs.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lfsr = fs.listFiles(srcPath, true);
			while (lfsr.hasNext()) {
				LocatedFileStatus next = lfsr.next();
				FSDataInputStream fdIn = fs.open(next.getPath());
				IOUtils.copyBytes(fdIn, fsdOut, 2048);
				fdIn.close();
			}
			fsdOut.close();
		} else {
			FSDataInputStream fsIn = fs.open(srcPath);
			IOUtils.copyBytes(fsIn, fsdOut, 2048);
			fsIn.close();
			fsdOut.close();
		}
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456
  • 457
  • 458
  • 459
  • 460
  • 461
  • 462
  • 463
  • 464
  • 465
  • 466
  • 467
  • 468
  • 469
  • 470
  • 471
  • 472
  • 473
  • 474
  • 475
  • 476
  • 477
  • 478
  • 479
  • 480
  • 481
  • 482
  • 483
  • 484
  • 485
  • 486
  • 487
  • 488
  • 489
  • 490
  • 491
  • 492
  • 493
  • 494
  • 495
  • 496
  • 497
  • 498
  • 499
  • 500
  • 501
  • 502
  • 503
  • 504
  • 505
  • 506
  • 507
  • 508
  • 509
  • 510
  • 511
  • 512
  • 513
  • 514
  • 515
  • 516
  • 517
  • 518
  • 519
  • 520
  • 521
  • 522
  • 523
  • 524
  • 525
  • 526
  • 527
  • 528
  • 529
  • 530
  • 531
  • 532
  • 533
  • 534
  • 535
  • 536
  • 537
  • 538
  • 539
  • 540
  • 541
  • 542
  • 543
  • 544
  • 545
  • 546
  • 547
  • 548
  • 549
  • 550
  • 551
  • 552
  • 553
  • 554
  • 555
  • 556
  • 557
  • 558
  • 559
  • 560
  • 561
  • 562
  • 563
  • 564
  • 565
  • 566
  • 567
  • 568
  • 569
  • 570
  • 571
  • 572
  • 573
  • 574
  • 575
  • 576
  • 577
  • 578
  • 579
  • 580
  • 581
  • 582
  • 583
  • 584
  • 585
  • 586
  • 587
  • 588
  • 589
  • 590
  • 591
  • 592
  • 593
  • 594
  • 595
  • 596
  • 597
  • 598
  • 599
  • 600
  • 601
  • 602
  • 603
  • 604
  • 605
  • 606
  • 607
  • 608
  • 609
  • 610
  • 611
  • 612
  • 613
  • 614
  • 615
  • 616
  • 617
  • 618
  • 619
  • 620
  • 621
  • 622
  • 623
  • 624
  • 625
  • 626
  • 627
  • 628
  • 629
  • 630
  • 631
  • 632
  • 633
  • 634
  • 635
  • 636
  • 637
  • 638
  • 639
  • 640
  • 641
  • 642
  • 643
  • 644
  • 645
  • 646
  • 647
  • 648
  • 649
  • 650
  • 651
  • 652
  • 653
  • 654
  • 655
  • 656
  • 657
  • 658
  • 659
  • 660
  • 661
  • 662
  • 663
  • 664
  • 665
  • 666
  • 667
  • 668

四、高可用环境的操作类

该示例仅仅是示例高可用环境下的api访问,实际的类还是第三部分的。

import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author alanchan 测试高可用环境下
 */
public class HDFSHAUtilTest {
	private HDFSUtil hdfsUtil;

	@Before
	public void setUp() throws Exception {
		Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
        conf.set("dfs.nameservices", "HadoopHAcluster");
        conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
        conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
        conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance();
		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
		


	}

	@Test
	public void testMkdir() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Test
	public void testExists() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@After
	public void close() {
		hdfsUtil.close();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/130334620"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top