Hadoop系列文章目录
1、hadoop3.1.4简单介绍及部署、简单验证
2、HDFS操作 - shell客户端
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)
5、HDFS API的RESTful风格–WebHDFS
6、HDFS的HttpFS-代理服务
7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法
8、HDFS内存存储策略支持和“冷热温”存储
9、hadoop高可用HA集群部署及三种方式验证
10、HDFS小文件解决方案–Archive
11、hadoop环境下的Sequence File的读写与合并
12、HDFS Trash垃圾桶回收介绍与示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介绍及wordcount
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN
17、MapReduce的分区Partition介绍
18、MapReduce的计数器与通过MapReduce读取/写入数据库示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介绍
21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化
本示例介绍java通过api操作hdfs。
主要包含HDFS的读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等。
本文分为2个部分,即环境准备和示例。
一、配置Windows下Hadoop环境
在windows上做HDFS客户端应用开发,需要设置Hadoop环境,而且要求是windows平台编译的Hadoop,不然会报以下的错误:
#缺少winutils.exe
Could not locate executable null binwinutils.exe in the hadoop binaries
#缺少hadoop.dll
Unable to load native-hadoop library for your platform… using builtin-Java classes where applicable
- 1
- 2
- 3
- 4
1、解压hadoop-3.1.4_winutils.zip文件
将已经编译好的Windows版本Hadoop解压到到一个没有中文、没有空格的路径下面
该文件由于不能上传,可以参考我的笔记:https://note.youdao.com/s/Tp6Y92QO
2、配置环境变量
在windows上面配置hadoop的环境变量: HADOOP_HOME,并将%HADOOP_HOME%in添加到path中。
3、复制hadoop.dll文件
把hadoop3.1.4文件夹中bin目录下的hadoop.dll文件放到系统盘: C:WindowsSystem32 目录
以上,完成了windows环境的配置。
二、示例
核心是从HDFS提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件。
1、客户端核心类
- Configuration 配置对象类,用于加载或设置参数属性
- FileSystem 文件系统对象基类。针对不同文件系统有不同具体实现。该类封装了文件系统的相关操作方法。
在Java中操作HDFS,主要涉及以下Class:
- Configuration:该类的对象封转了客户端或者服务器的配置
- FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作,通过FileSystem的静态方法get获得该对象。
FileSystem fs = FileSystem.get(conf);
- 1
- get方法
从conf中的一个参数 fs.defaultFS的配置值判断具体是什么类型的文件系统。如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象。
#获取FileSystem方式,有2中方式
#第一种
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
//指定我们使用的文件系统类型:
configuration.set("fs.defaultFS", "hdfs://server1:8020/");
//获取指定的文件系统
FileSystem fileSystem = FileSystem.get(configuration);
System.out.println(fileSystem.toString());
}
#第二种
public void getFileSystem2() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://server1:8020"), new Configuration());
System.out.println("fileSystem:"+fileSystem);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
2、创建工程及示例
1)、pom.xml导入Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-commonartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfsartifactId>
<version>3.1.4version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.1version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
plugins>
build>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
2)、创建java测试类
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.hadoop.hdfs.sentiment.dfs.impl.MgrHdfsImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HdfsTest {
private static Configuration conf = null;
private static FileSystem fileSystem = null;
private static final String HADOOP_USER_NAME = "alanchan";
private static final String DEFAULTFS = "hdfs://server1:8020";
private static final String BASICPATH = "/test_hadoop_client_java";
private static final String BASICFILEPATH = "/test_hadoop_client_java/java.txt";
private static final String LOCALFILEPATH = "D:/workspace/bigdata-component/hadoop/testhadoopclient_java.txt";
private static final String RENAMEILEPATH = "/test_hadoop_client_java_NEW/bigdata_rename.txt";
private static final String COPYEILEPATH = "/test_hadoop_client_java/bigdata.txt";
private static final String COPYEILEPATHTO = "/test_hadoop_client_java_Copy/bigdata.txt";
private static final String COPYEILEPATHTO2 = "/test_hadoop_client_java_Copy2/bigdata.txt";
// 初始化方法 用于和hdfs集群建立连接
@Before
public void connect2HDFS() throws IOException {
// 设置客户端身份 以具备权限在hdfs上进行操作
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
// 创建配置对象实例
conf = new Configuration();
// 设置操作的文件系统是HDFS 并且指定HDFS操作地址
conf.set("fs.defaultFS", DEFAULTFS);
// 创建FileSystem对象实例
fileSystem = FileSystem.get(conf);
}
@Test
public void mkdir() throws IOException {
// FileSystem.exists指定HDFS文件/文件夹是否存在
// Path file = new Path("hdfsPath");
// boolean isExists = fs.exists(file);
// 首先判断文件夹是否存在,如果不存在再创建
if (!fileSystem.exists(new Path(BASICPATH))) {
// 创建文件夹
fileSystem.mkdirs(new Path(BASICPATH));
}
}
//
@Test
public void putFile2HDFS() throws IOException {
// 创建本地文件路径
Path src = new Path(LOCALFILEPATH);
// hdfs上传路径
Path dst = new Path(BASICFILEPATH);
// 文件上传动作(local--->hdfs)
fileSystem.copyFromLocalFile(src, dst);
}
@Test
public void testHdfs() throws Exception {
MgrHdfsImpl hdfs = new MgrHdfsImpl();
Path path = new Path(BASICPATH + "/hdfs.txt");
String content = "hdfs://server1:8020/sentiment/data_p/task_20220830163507/weibo_data_2.txt";
hdfs.writeFile(path, content);
log.info(" readFile={}", hdfs.readFileToString(path));
}
@Test
public void writeFile() throws Exception {
FSDataOutputStream in = fileSystem.create(new Path(BASICPATH + "/a.txt"));
in.write("hdfs://server1:8020/sentiment/data_p/task_20220830163507/weibo_data_2.txt".getBytes());
in.flush();
in.close();
}
@Test
public void readFile() throws Exception {
// FSDataInputStream out = fileSystem.open(new Path(BASICFILEPATH));
FSDataInputStream out = fileSystem
.open(new Path("hdfs://server1:8020/sentiment/data_p/willDoing_20220830170414"));
// IOUtils.copyBytes(out,System.out,1024);
BufferedReader br = new BufferedReader(new InputStreamReader(out));
String line;
String result = "";
while ((line = br.readLine()) != null) {
// 遍历抓取到的每一行并将其存储到result里面
result += line + "
";
}
// String content = out.readUTF();
System.out.println("读文件: " + result);
// System.out.println("读文件: " + content);
out.close();
}
//
// // 获取文件夹下文件大小
@Test
public void getFileSize() throws IllegalArgumentException, IOException {
log.info("summary={}", fileSystem.getContentSummary(new Path(BASICPATH)).getLength());
}
//
@Test
public void getFile2Local() throws IOException {
// 源路径:hdfs的路径
Path src = new Path(BASICFILEPATH);
// 目标路径:local本地路径
Path dst = new Path(LOCALFILEPATH);
// 文件下载动作(hdfs--->local)
fileSystem.copyToLocalFile(src, dst);
}
//
@Test
public void rename() throws Exception {
Path srcPath = new Path(BASICFILEPATH);
Path destpath = new Path(RENAMEILEPATH);
if (fileSystem.exists(srcPath)) {
fileSystem.rename(srcPath, destpath);
}
}
//没有递归
@Test
public void listFiles() throws Exception {
Path destPath = new Path(BASICPATH);
if (fileSystem.exists(destPath)) {
FileStatus[] listStatus = fileSystem.listStatus(destPath);
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
log.info("file is ={}", fileStatus.getPath().getName());
} else {
log.info("dir is = {}", fileStatus.getPath().getName());
}
}
}
}
//
@Test
public void copyFiles() throws Exception {
Path srcPath = new Path(COPYEILEPATH);
Path destpath = new Path(COPYEILEPATHTO);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
//
public void copyFiles(String src, String dest) throws Exception {
Path srcPath = new Path(src);
Path destpath = new Path(dest);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
//
@Test
public void copyFiles2() throws Exception {
Path srcPath = new Path(COPYEILEPATH);
Path destpath = new Path(COPYEILEPATHTO2);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
byte[] b = new byte[1024];
int hasRead = 0;
while ((hasRead = in.read(b)) > 0) {
out.write(b, 0, hasRead);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
private static final String BASICPATH_COPYDIR_SRC = "/testhdfs_copyDir/src";
private static final String BASICPATH_COPYDIR_DEST = "/testhdfs_copyDir/dest";
// 递归遍历文件夹
@Test
public void listDir() throws Exception {
Path path = new Path(BASICPATH_COPYDIR_SRC);
listDir(path);
}
public void listDir(Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
listDir(fileStatus.getPath());
log.info("目录 = {}", fileStatus.getPath());
} else {
log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
}
// HDFS API 遍历文件夹中的文件
@Test
public void listDir2() throws Exception {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem
.listFiles(new Path(BASICPATH_COPYDIR_SRC), true);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
log.info("2 --------- 文件完整路径 = {},文件名={}", next.getPath(), next.getPath().getName());
}
}
// 文件夹拷贝,包含文件夾
@Test
public void copyDir() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
// public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, false, conf);
listDir(destpath);
}
// 拷貝文件及目錄,但不包含BASICPATH_COPYDIR_SRC的第一層目錄
@Test
public void copyFilesIncludeDir() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, false, conf);
log.info("目录 = {}", fileStatus.getPath());
} else {
FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, false, conf);
log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
listDir(destpath);
}
// 拷贝源文件夹下的所有文件到目标文件夹,不含源文件夹下的文件夹
@Test
public void copyDirOnlyFiles() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, false, conf);
}
listDir(destpath);
}
private static final String BASICPATH_COPYDIR = "/testhdfs_copyDir";
// 查找文件
@Test
public void search() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR);
String searchFileName = "2022年度本市工程系列计算机技术及应用专业高级职称评审工作已启动.docx";
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
Path srcFile = sourceFiles.next().getPath();
String srcFileName = srcFile.getName();
if (searchFileName.equals(srcFileName)) {
log.info("文件路徑={},查找文件名={}", srcFile, searchFileName);
}
}
}
private static final String TODELETEFILE = "/test_hadoop_client_java/bigdata.txt";
@Test
public void delete() throws Exception {
// 判断文件是否存在
if (fileSystem.exists(new Path(TODELETEFILE))) {
fileSystem.delete(new Path(TODELETEFILE), true);
}
}
@After
public void close() {
// 首先判断文件系统实例是否为null 如果不为null 进行关闭
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
评论记录:
回复评论: