Hadoop系列文章目录
1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化
本文编写了java对HDFS的常见操作,并且均测试通过。其功能包含构造conf、设置系统环境变量、创建目录、判断文件是否存在、获取文件/目录的大小等,具体见下图。
本文分为四部分,即pom.xml、junit测试类、操作类、高可用环境操作类。
- pom.xml描述本代码运行需要哪些maven依赖
- junit测试类是对操作类的单元测试
- 操作类就是对hdfs api的使用
- 高可用环境操作类就是操作类在高可用环境中的配置
一、pom.xml
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>com.okcardgroupId>
<artifactId>bigdata-componentartifactId>
<version>0.0.1-SNAPSHOTversion>
parent>
<groupId>com.okcardgroupId>
<artifactId>hadoopartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>hadoopname>
<url>http://maven.apache.orgurl>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-commonartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfsartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.13version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.22version>
dependency>
dependencies>
project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
二、junit测试类
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author alanchan HDFSUtil测试用例
*/
public class HDFSUtilTest {
private HDFSUtil hdfsUtil;
@Before
public void setUp() throws Exception {
hdfsUtil = HDFSUtil.getHDFSUtilInstance();
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
}
// assertTrue(String message, boolean condition)
// assertFalse(String message, boolean condition)
// assertNotEquals(String message, Object unexpected,Object actual)
// assertArrayEquals(String message, Object[] expecteds,Object[] actuals)
// assertNotNull(String message, Object object)
// assertNull(String message, Object object)
// assertSame(String message, Object expected, Object actual)
@Test
public void testMkdir() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testExists() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetHdfsFileSize() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
hdfsUtil.writeHdfsFile(path, fileContent);
long fileSize = hdfsUtil.getHdfsFileSize(path);
// assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertEquals("文件大小与期望一致", fileContent.length(), fileSize);
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteHdfsFile2() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(new Path(path), fileContent.getBytes()));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteUTFHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testReadHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertEquals("读取文件内容与预期一致", fileContent, hdfsUtil.readHdfsFile(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testReadUTFHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
assertEquals("读取UTF文件内容与预期一致", fileContent, hdfsUtil.readUTFHdfsFile(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteAndAppendHdfsFile() {
try {
// 構造conf
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://server1:8020");
conf.setBoolean("dfs.support.append", true);
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
// 調用示例
hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
fileContent = "
line this is another testing";
assertTrue("追加写文件成功", hdfsUtil.writeAndAppendHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWalkHdfsDir() {
try {
String dir = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
hdfsUtil.walkHdfsDir(new Path("/test"));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWalkHdfsDir_Iter() {
try {
String dir = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
hdfsUtil.walkHdfsDir_Iter(new Path("/test"), true);
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testUploadLocalFileToHdfs() {
try {
String localFile = "d://test.txt";
BufferedWriter out = new BufferedWriter(new FileWriter(localFile));
out.write("this is test uplaod file testing");
out.close();
// File tempFile = new File(localFile);
// tempFile.deleteOnExit();
// tempFile.delete();
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("上传本地文件成功", hdfsUtil.uploadLocalFileToHdfs(localFile, path));
assertTrue("刪除本地临时文件成功", new File(localFile).delete());
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDownloadHdfsToLocal() {
try {
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
String localFile = "d://testdown.txt";
assertTrue("从HDFS上下载文件到本地成功", hdfsUtil.downloadHdfsToLocal(path, localFile));
BufferedReader in = new BufferedReader(new FileReader(localFile));
StringBuilder localFileContent = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
localFileContent.append(line).append("
");
}
localFileContent.deleteCharAt(localFileContent.length() - 1);
in.close();
assertEquals("下文件内容与hdfs文件内容一致", localFileContent.toString(), fileContent);
assertTrue("刪除本地临时文件成功", new File(localFile).delete());
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRename() {
try {
String path = "/test/test/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
String destPath = "/t/test.log";
assertTrue("rename成功", hdfsUtil.rename(path, destPath, true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/t", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDeleteHdfs() {
try {
String path = "/test/test/testsub.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFile() {
try {
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test/test/dest.txt";
assertTrue("複製文件成功", hdfsUtil.copyFile(srcPath, destpath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFile2() {
try {
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test/test/dest.txt";
assertTrue("複製文件成功", hdfsUtil.copyFile2(srcPath, destpath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyDir() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test1";
assertTrue("複製文件目錄成功", hdfsUtil.copyDir(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test1", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFilesExcludeDir() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test2";
assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
assertTrue("複製文件目錄成功", hdfsUtil.copyFilesExcludeDir(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test2", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyDirOnlyFiles() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test3";
assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
assertTrue("複製文件目錄成功", hdfsUtil.copyDirOnlyFiles(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test3", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSearchFile() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing
line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String searchFileName = "src2.txt";
assertEquals("查找文件数量与预期一致", 1, hdfsUtil.searchFile(dir, searchFileName, true).size());
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFromLocalFileWithProgress() {
try {
String destPath = "/test/test/t.rat";
String localBigFile = "D:\BaiduNetdiskDownload\test.rar";
assertTrue("查找文件数量与预期一致", hdfsUtil.copyFromLocalFileWithProgress(localBigFile, destPath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testMergeFile() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/src2.txt";
fileContent = "this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destPath = "/test/src3.txt";
assertTrue("文件合并成功", hdfsUtil.mergeFile(dir, destPath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs(destPath, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testTest() {
hdfsUtil.test();
hdfsUtil.test("123");
hdfsUtil.test("123", "456");
hdfsUtil.test("");
// test();
// test("test");
// test("123", "321");
assertEquals("Regular multiplication should work", "", "");
}
@After
public void close() {
hdfsUtil.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
- 382
- 383
- 384
- 385
- 386
- 387
- 388
- 389
- 390
- 391
- 392
- 393
- 394
- 395
- 396
- 397
- 398
- 399
- 400
- 401
- 402
- 403
- 404
- 405
- 406
- 407
- 408
- 409
- 410
- 411
- 412
- 413
- 414
- 415
- 416
- 417
- 418
- 419
- 420
- 421
- 422
- 423
- 424
- 425
- 426
- 427
- 428
- 429
- 430
- 431
- 432
- 433
- 434
- 435
- 436
- 437
- 438
- 439
- 440
- 441
- 442
- 443
- 444
- 445
- 446
- 447
- 448
- 449
- 450
- 451
- 452
- 453
- 454
- 455
- 456
三、操作类(帮助类)
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import lombok.extern.slf4j.Slf4j;
/**
* @author alanchan hdfs常用帮助类
*/
@Slf4j
public class HDFSUtil {
private static Configuration conf = null;
private static FileSystem fileSystem = null;
private static final String HADOOP_USER_NAME = "alanchan";
private static final String DEFAULTFS = "hdfs://server1:8020";
private static HDFSUtil hdfsInstance = null;
/**
*
* @param strings 默认第一个参数是defaultFS,可以传更多参数,如果传入2个参数,则第二个默认为user,其他参数暂没用
* @throws Exception
*/
private HDFSUtil(String... strings) throws Exception {
// 设置客户端身份 以具备权限在hdfs上进行操作
if (strings.length >= 2) {
System.setProperty("HADOOP_USER_NAME", strings[1]);
} else {
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
}
conf = new Configuration();
// 设置操作的文件系统是HDFS 并且指定HDFS操作地址。第一个参数是defaultFS
if (strings.length >= 1) {
conf.set("fs.defaultFS", strings[0]);
} else {
conf.set("fs.defaultFS", DEFAULTFS);
}
fileSystem = FileSystem.get(conf);
}
private HDFSUtil(Configuration conf, String... strings) throws Exception {
// 设置客户端身份 以具备权限在hdfs上进行操作
if (strings.length >= 1) {
System.setProperty("HADOOP_USER_NAME", strings[0]);
} else {
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
}
this.conf = conf;
fileSystem = FileSystem.get(this.conf);
}
public static HDFSUtil getHDFSUtilInstance(Configuration conf, String... strings) throws Exception {
if (hdfsInstance == null) {
hdfsInstance = new HDFSUtil(conf, strings);
}
return hdfsInstance;
}
public static HDFSUtil getHDFSUtilInstance(String... strings) throws Exception {
if (hdfsInstance == null) {
hdfsInstance = new HDFSUtil(strings);
}
return hdfsInstance;
}
public boolean mkdir(String path) throws Exception {
return mkdir(new Path(path));
}
public boolean mkdir(Path p) throws Exception {
if (!fileSystem.exists(p)) {
fileSystem.mkdirs(p);
}
return fileSystem.exists(p);
}
public boolean exists(String path) throws Exception {
return exists(new Path(path));
}
public boolean exists(Path p) throws Exception {
return fileSystem.exists(p);
}
public long getHdfsFileSize(String path) throws Exception {
return getHdfsFileSize(new Path(path));
}
public long getHdfsFileSize(Path path) throws Exception {
return fileSystem.getContentSummary(path).getLength();
}
/**
*
* @param path 写入文件的路径,含文件名称
* @param fileContent 写入文件的内容
* @return 如果文件存在,且文件大小大于0,则视为写入成功
* @throws Exception
*/
public boolean writeHdfsFile(String path, String fileContent) throws Exception {
return writeHdfsFile(new Path(path), fileContent);
}
public boolean writeHdfsFile(Path path, byte[] fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.write(fileContent);
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
public boolean writeHdfsFile(Path path, String fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.write(fileContent.getBytes());
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
/**
* Writes a string to the underlying output stream usingmodified UTF-8 encoding
* in a machine-independent manner.
*
* @param path
* @param fileContent
* @return
* @throws Exception
*/
public boolean writeUTFHdfsFile(Path path, String fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.writeUTF(fileContent);
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
public boolean writeUTFHdfsFile(String path, String fileContent) throws Exception {
return writeUTFHdfsFile(new Path(path), fileContent);
}
public String readHdfsFile(String path) throws Exception {
return readHdfsFile(new Path(path));
}
public String readHdfsFile(Path path) throws Exception {
FSDataInputStream out = fileSystem.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(out));
String line;
StringBuilder result = new StringBuilder();
while ((line = br.readLine()) != null) {
result.append(line).append("
");
}
// 去掉最后一个换行符
result.deleteCharAt(result.length() - 1);
br.close();
out.close();
return result.toString();
}
public String readUTFHdfsFile(String path) throws Exception {
return readUTFHdfsFile(new Path(path));
}
/**
* 好像只能读取通过writeUTF写入的文件
*
* @param path
* @return
* @throws Exception
*/
public String readUTFHdfsFile(Path path) throws Exception {
FSDataInputStream out = fileSystem.open(path);
String fileContent = out.readUTF();
out.close();
return fileContent;
}
// 構造conf
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", DEFAULTFS);
// conf.setBoolean("dfs.support.append", true);
// conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
// conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
// 調用示例
// HDFSUtil.getHDFSUtilInstance(conf).writeAndAppendHdfsFile(new Path(path1), path2);
/**
* 该例的前提是文件已经存在,仅仅是演示追加fileContent内容至目标文件中
*
* @param destPath
* @param fileContent
* @return
* @throws Exception
*/
public boolean writeAndAppendHdfsFile(Path destPath, String fileContent) throws Exception {
fileSystem = destPath.getFileSystem(conf);
if (!fileSystem.exists(destPath)) {
fileSystem.createNewFile(destPath);
}
FSDataOutputStream output = fileSystem.append(destPath);
// FSDataOutputStreamBuilder output = fileSystem.appendFile(destPath);
output.write(fileContent.getBytes("UTF-8"));
// 文件最后多一个换行符
output.write("
".getBytes("UTF-8"));
// fileSystem.close();
output.close();
return fileSystem.exists(destPath) && getHdfsFileSize(destPath) > 0;
}
public boolean writeAndAppendHdfsFile(String destPath, String fileContent) throws Exception {
return writeAndAppendHdfsFile(new Path(destPath), fileContent);
}
public void walkHdfsDir(String path) throws Exception {
walkHdfsDir(new Path(path));
}
public void walkHdfsDir(Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
walkHdfsDir(fileStatus.getPath());
log.info("目录 = {}", fileStatus.getPath());
} else {
log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
}
public void walkHdfsDir_Iter(String path, boolean recursive) throws Exception {
walkHdfsDir_Iter(new Path(path), recursive);
}
/**
*
* @param path
* @param recursive 是否递归
* @throws Exception
*/
public void walkHdfsDir_Iter(Path path, boolean recursive) throws Exception {
RemoteIterator<LocatedFileStatus> lfsRemoteIterator = fileSystem.listFiles(path, recursive);
while (lfsRemoteIterator.hasNext()) {
LocatedFileStatus next = lfsRemoteIterator.next();
log.info("文件完整路径 = {},文件名={}", next.getPath(), next.getPath().getName());
}
}
public boolean uploadLocalFileToHdfs(String localFile, String dest) throws Exception {
return uploadLocalFileToHdfs(localFile, new Path(dest));
}
/**
*
* @param localFile
* @param dest
* @return 简单比较目标和源文件的大小判断是否上传成功
* @throws Exception
*/
public boolean uploadLocalFileToHdfs(String localFile, Path dest) throws Exception {
fileSystem.copyFromLocalFile(new Path(localFile), dest);
File file = new File(localFile);
return file.length() == getHdfsFileSize(dest);
}
public boolean downloadHdfsToLocal(String hdfsFile, String localFile) throws Exception {
return downloadHdfsToLocal(new Path(hdfsFile), localFile);
}
public boolean downloadHdfsToLocal(Path hdfsFile, String localFile) throws Exception {
fileSystem.copyToLocalFile(hdfsFile, new Path(localFile));
File file = new File(localFile);
return file.length() == getHdfsFileSize(hdfsFile);
}
/**
*
* @param srcPath
* @param destPath
* @param mkdirFlag 如果目标目录不存在是否创建
* @return
* @throws Exception
*/
public boolean rename(Path srcPath, Path destPath, boolean mkdirFlag) throws Exception {
boolean flag = false;
if (fileSystem.exists(srcPath)) {
if (mkdirFlag && !fileSystem.exists(destPath)) {
fileSystem.mkdirs(destPath);
}
flag = fileSystem.rename(srcPath, destPath);
}
return flag;
}
public boolean rename(String srcPath, String destPath, boolean mkdirFlag) throws Exception {
return rename(new Path(srcPath), new Path(destPath), mkdirFlag);
}
/**
* 删除文件或目录
*
* @param dest
* @param recursive 是否递归删除
* @return
* @throws Exception
*/
public boolean deleteHdfs(Path dest, boolean recursive) throws Exception {
return fileSystem.delete(dest, recursive);
}
public boolean deleteHdfs(String dest, boolean recursive) throws Exception {
return deleteHdfs(new Path(dest), recursive);
}
/**
* 拷贝单个文件或流?
*
* @param srcPath
* @param destpath
* @throws Exception
*/
public void copyFile(Path srcPath, Path destpath) throws Exception {
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
public boolean copyFile(String srcPath, String destpath) throws Exception {
copyFile(new Path(srcPath), new Path(destpath));
return true;
}
public boolean copyFile2(String srcPath, String destpath) throws Exception {
copyFile2(new Path(srcPath), new Path(destpath));
return true;
}
public void copyFile2(Path srcPath, Path destpath) throws Exception {
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
byte[] b = new byte[1024];
int hasRead = 0;
while ((hasRead = in.read(b)) > 0) {
out.write(b, 0, hasRead);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
public boolean copyDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyDir(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 文件夹拷贝,包含文件夾
*
* @param srcPath
* @param destpath
* @param deleteSource
* @return
* @throws Exception
*/
public boolean copyDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
// public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
return FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, deleteSource, conf);
}
/**
* 拷貝文件及目錄,但不包含srcPath的第一層目錄,即srcPath是一个目录,但只拷贝srcPath最后一层目录下的所有文件及目录,但不包含本身
* 例如:srcPath=/copyDir/src,destPath=/copyDir/dest,最终结果是将/copyDir/src下面的所有文件及目录拷贝至/copyDir/dest下,但不包含src这个文件夹
*
* @param srcPath
* @param destpath
* @param deleteSource
* @throws Exception
*/
public boolean copyFilesExcludeDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
boolean flag = false;
FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
// copyFilesExcludeDir(fileStatus.getPath(), destpath, deleteSource);
flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
} else {
flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
}
}
return flag;
}
public boolean copyFilesExcludeDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyFilesExcludeDir(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 拷贝源文件夹下的所有文件到目标文件夹,不含源文件夹下的文件夹
*
* @param srcPath
* @param destpath
* @param deleteSource
* @return
* @throws Exception
*/
public boolean copyDirOnlyFiles(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
boolean flag = false;
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
flag = FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, deleteSource, conf);
}
return flag;
}
public boolean copyDirOnlyFiles(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyDirOnlyFiles(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 查找文件,精确查找 equals
*
* @param srcPath
* @param searchFileName
* @param recursive
* @return
* @throws Exception
*/
public List<Path> searchFile(Path srcPath, String searchFileName, boolean recursive) throws Exception {
List<Path> list = new ArrayList<Path>();
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, recursive);
while (sourceFiles.hasNext()) {
Path file = sourceFiles.next().getPath();
String srcFileName = file.getName();
if (searchFileName.equals(srcFileName)) {
list.add(file);
}
}
return list;
}
public List<Path> searchFile(String srcPath, String searchFileName, boolean recursive) throws Exception {
return searchFile(new Path(srcPath), searchFileName, recursive);
}
/**
* 通過流的方式,带进度条的本地文件拷贝至hdfs
*
* @param fs
* @throws Exception
*/
public boolean copyFromLocalFileWithProgress(File srcPath, Path destPath) throws Exception {
InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
final float fileSize = srcPath.length() / 65536;
FSDataOutputStream out = fileSystem.create(destPath, new Progressable() {
long fileCount = 0;
@Override
// progress 方法在每次上传了64K(64*1024=65536)B字节大小的文件之后会自动调用一次
// 通过累加之后的结果与整个文件大小来计算总上传文件的进度
public void progress() {
fileCount++;
System.out.println("进度%:" + (fileCount / fileSize) * 100 + " %");
}
});
IOUtils.copyBytes(in, out, 4096);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
return getHdfsFileSize(destPath) == srcPath.length();
}
public boolean copyFromLocalFileWithProgress(String srcPath, String destPath) throws Exception {
return copyFromLocalFileWithProgress(new File(srcPath), new Path(destPath));
}
/**
* 合并文件
*
* @param srcPath 目錄
* @param destPath 文件名,含路徑
* @return
* @throws Exception
*/
public boolean mergeFile(Path srcPath, Path destPath) throws Exception {
FSDataOutputStream fout = fileSystem.create(destPath);
if (fileSystem.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lsfr = fileSystem.listFiles(srcPath, true);
while (lsfr.hasNext()) {
LocatedFileStatus next = lsfr.next();
FSDataInputStream fin = fileSystem.open(next.getPath());
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
}
}
fout.close();
return getHdfsFileSize(srcPath) == getHdfsFileSize(srcPath);
}
public boolean mergeFile(String srcPath, String destPath) throws Exception {
return mergeFile(new Path(srcPath), new Path(destPath));
}
public void close() {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String path1 = "/test/test.rar";
String path2 = "This is a testing";
String searchFileName = "D:\BaiduNetdiskDownload\2test.rar";
try {
// HDFSUtil.getHDFSUtilInstance().writeUTFHdfsFile(path1, path2);copyDir copyFilesExcludeDir
String srcPath = "/testhdfs_copyDir/src", destPath = "/testhdfs_copyDir/dest";
System.out.println(HDFSUtil.getHDFSUtilInstance().copyFilesExcludeDir(srcPath, destPath, false));
} catch (Exception e) {
e.printStackTrace();
}
// test();
// test("test");
// test("123", "321");
}
static void test(String... strings) {
if (strings.length == 1) {
System.out.println("testing=" + strings[0]);
} else {
System.out.println("testing=" + strings.length);
}
}
/**
* 没有验证,仅示例加载配置文件
*
* @param srcPath
* @param destPath
* @throws Exception
*/
private void copyOrMoveFiles(Path srcPath, Path destPath) throws Exception {
Configuration conf = new Configuration();
conf.addResource("basedir/hdfs-site.xml");
conf.addResource("basedir/core-site.xml");
// FileSystem fs = FileSystem.newInstance(conf);
FileContext fc = FileContext.getFileContext(conf);
FileContext.Util util = fc.util();
util.copy(srcPath, destPath, false, false);
}
/**
* 遍历目录,IOUtils.copyBytes(),顺序复制,通过获取本地文件系统或者直接new FileInputStream()的方式实现 2种
* 沒有驗證,主要參考其流、conf的應用
*
* @param srcPath
* @param destPath
* @throws Exception
*/
private void upload(Path srcPath, Path destPath) throws Exception {
// 获取本地文件系统
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
if (localFs.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lsfr = localFs.listFiles(srcPath, true);
while (lsfr.hasNext()) {
LocatedFileStatus next = lsfr.next();
FSDataInputStream fin = fileSystem.open(next.getPath());
FSDataOutputStream fout = fileSystem.create(new Path("/test/data/" + next.getPath().getName()));
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
fout.close();
}
} else {
FSDataInputStream fin = fileSystem.open(srcPath);
// InputStream fin = new FileInputStream("D://a.txt");
FSDataOutputStream fout = fileSystem.create(destPath);
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
fout.close();
}
}
private void upload(String srcPath, String destPath) throws Exception {
upload(new Path(srcPath), new Path(destPath));
}
/**
* 沒有驗證,主要參考其流、conf的應用
*
* @param srcPath
* @param destPath
* @param fs
* @param local
* @param fc
* @throws Exception
*/
private void downLoad(Path srcPath, Path destPath, FileSystem fs, FileSystem local, FileContext fc)
throws Exception {
// 2.字节流的复制
FileOutputStream localFileOut = new FileOutputStream("D:\test\a.txt");
FSDataInputStream remoteFin = fs.open(srcPath);
IOUtils.copyBytes(remoteFin, localFileOut, 2048);
remoteFin.close();
localFileOut.close();
//4.下载前做合并
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
FSDataOutputStream fsdOut = localFs.create(destPath);
if (fs.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lfsr = fs.listFiles(srcPath, true);
while (lfsr.hasNext()) {
LocatedFileStatus next = lfsr.next();
FSDataInputStream fdIn = fs.open(next.getPath());
IOUtils.copyBytes(fdIn, fsdOut, 2048);
fdIn.close();
}
fsdOut.close();
} else {
FSDataInputStream fsIn = fs.open(srcPath);
IOUtils.copyBytes(fsIn, fsdOut, 2048);
fsIn.close();
fsdOut.close();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
- 382
- 383
- 384
- 385
- 386
- 387
- 388
- 389
- 390
- 391
- 392
- 393
- 394
- 395
- 396
- 397
- 398
- 399
- 400
- 401
- 402
- 403
- 404
- 405
- 406
- 407
- 408
- 409
- 410
- 411
- 412
- 413
- 414
- 415
- 416
- 417
- 418
- 419
- 420
- 421
- 422
- 423
- 424
- 425
- 426
- 427
- 428
- 429
- 430
- 431
- 432
- 433
- 434
- 435
- 436
- 437
- 438
- 439
- 440
- 441
- 442
- 443
- 444
- 445
- 446
- 447
- 448
- 449
- 450
- 451
- 452
- 453
- 454
- 455
- 456
- 457
- 458
- 459
- 460
- 461
- 462
- 463
- 464
- 465
- 466
- 467
- 468
- 469
- 470
- 471
- 472
- 473
- 474
- 475
- 476
- 477
- 478
- 479
- 480
- 481
- 482
- 483
- 484
- 485
- 486
- 487
- 488
- 489
- 490
- 491
- 492
- 493
- 494
- 495
- 496
- 497
- 498
- 499
- 500
- 501
- 502
- 503
- 504
- 505
- 506
- 507
- 508
- 509
- 510
- 511
- 512
- 513
- 514
- 515
- 516
- 517
- 518
- 519
- 520
- 521
- 522
- 523
- 524
- 525
- 526
- 527
- 528
- 529
- 530
- 531
- 532
- 533
- 534
- 535
- 536
- 537
- 538
- 539
- 540
- 541
- 542
- 543
- 544
- 545
- 546
- 547
- 548
- 549
- 550
- 551
- 552
- 553
- 554
- 555
- 556
- 557
- 558
- 559
- 560
- 561
- 562
- 563
- 564
- 565
- 566
- 567
- 568
- 569
- 570
- 571
- 572
- 573
- 574
- 575
- 576
- 577
- 578
- 579
- 580
- 581
- 582
- 583
- 584
- 585
- 586
- 587
- 588
- 589
- 590
- 591
- 592
- 593
- 594
- 595
- 596
- 597
- 598
- 599
- 600
- 601
- 602
- 603
- 604
- 605
- 606
- 607
- 608
- 609
- 610
- 611
- 612
- 613
- 614
- 615
- 616
- 617
- 618
- 619
- 620
- 621
- 622
- 623
- 624
- 625
- 626
- 627
- 628
- 629
- 630
- 631
- 632
- 633
- 634
- 635
- 636
- 637
- 638
- 639
- 640
- 641
- 642
- 643
- 644
- 645
- 646
- 647
- 648
- 649
- 650
- 651
- 652
- 653
- 654
- 655
- 656
- 657
- 658
- 659
- 660
- 661
- 662
- 663
- 664
- 665
- 666
- 667
- 668
四、高可用环境的操作类
该示例仅仅是示例高可用环境下的api访问,实际的类还是第三部分的。
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author alanchan 测试高可用环境下
*/
public class HDFSHAUtilTest {
private HDFSUtil hdfsUtil;
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
conf.set("dfs.nameservices", "HadoopHAcluster");
conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// hdfsUtil = HDFSUtil.getHDFSUtilInstance();
hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
}
@Test
public void testMkdir() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testExists() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
hdfsUtil.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
评论记录:
回复评论: