Hadoop——HDFS的Java API操作

网友投稿 276 2022-11-19

Hadoop——HDFS的Java API操作

HDFS的Java API操作

一、HDFS客户端环境准备

1)根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径

2)配置HADOOP_HOME环境变量和path路径

二、HDFS的API操作

新建Maven工程并添加依赖

junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2

API操作代码

1、HDFS文件上传

@Test public void testPut() throws Exception { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); fileSystem.copyFromLocalFile( new Path("f:/hello.txt"), new Path("/0308_666/hello1.txt")); fileSystem.close(); }

2、HDFS文件下载

@Test public void testDownload() throws Exception { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); // 2 执行下载操作 fileSystem.copyToLocalFile( false, new Path("/0308_666/hello.txt"), new Path("f:/hello1.txt"), true); // 3 关闭资源 fileSystem.close(); System.out.println("over"); }

3、HDFS文件夹删除

@Test public void delete() throws Exception { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); // 2 执行删除操作 fileSystem.delete(new Path("/0308_777"), true); // 3 关闭资源 fileSystem.close(); System.out.println("over"); }

4、HDFS文件夹名更改

@Test public void testRename() throws Exception { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); // 2 执行重命名操作 fileSystem.rename(new Path("/0308_666/hello.txt"), new Path("/0308_666/hello2.txt")); // 3 关闭资源 fileSystem.close(); System.out.println("over"); }

5、HDFS文件详情查看

@Test public void testLS1() throws Exception { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); // 2 查询文件信息 RemoteIterator listFiles = fileSystem.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); // 文件的长度 System.out.println(fileStatus.getLen()); // 文件的名字 System.out.println(fileStatus.getPath().getName()); // 文件的权限 System.out.println(fileStatus.getPermission()); BlockLocation[] locations = fileStatus.getBlockLocations(); for (BlockLocation location : locations) { String[] hosts = location.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("---------------分割线---------------"); } // 3 关闭资源 fileSystem.close(); }

6、HDFS文件和文件夹判断

@Test public void testLS2() throws Exception { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); // 2 文件和文件夹的判断 FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/")); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isFile()) { System.out.println("F:" + fileStatus.getPath().getName()); } else { System.out.println("D:" + fileStatus.getPath().getName()); } } // 3 关闭资源 fileSystem.close(); }

HDFS的I/O流操作

1、HDFS文件上传

@Test public void testPut2() throws Exception { //1.获取hdfs的客户端 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); //2.创建输入流 FileInputStream fileInputStream = new FileInputStream(new File("f:/hello2.txt")); //3.创建输出流 FSDataOutputStream outputStream = fileSystem.create(new Path("/0308_666/hello3.txt")); //4.流的拷贝 IOUtils.copyBytes(fileInputStream, outputStream, configuration); //5.关闭资源 fileSystem.close(); }

2、HDFS文件下载

@Test public void testDownload2() throws Exception { //1.获取hdfs的客户端 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); //2.创建输入流 FSDataInputStream inputStream = fileSystem.open(new Path("/0308_666/hello3.txt")); //3.创建输出流 FileOutputStream outputStream = new FileOutputStream(new File("f:/hello3.txt")); //4.流的拷贝 IOUtils.copyBytes(inputStream, outputStream, configuration); //5.关闭资源 fileSystem.close(); System.out.println("over"); }

3、定位文件读取

需求:分块读取HDFS上的大文件

/** * 文件的下载: * 1.下载第一块 */ @Test public void testSeek1() throws Exception { //1.获取hdfs的客户端 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI("hdfs://hadoop102:9000"), configuration, "drift"); //2.创建输入流 FSDataInputStream inputStream = fileSystem.open(new Path("/user/drift/hadoop-2.7.2.tar.gz")); //3.创建输出流 FileOutputStream outputStream = new FileOutputStream(new File("f:/hadoop-2.7.2.tar.gz.part1")); //4.流的拷贝 byte[] buf = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { inputStream.read(buf); outputStream.write(buf); } //5.关闭资源 IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); fileSystem.close(); System.out.println("over"); } /** * 文件的下载: * 2.下载第二块 */ @Test public void testSeek2() throws Exception { //1.获取hdfs的客户端 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "drift"); //2.创建输入流 FSDataInputStream inputStream = fileSystem.open(new Path("/user/drift/hadoop-2.7.2.tar.gz")); //3.创建输出流 FileOutputStream outputStream = new FileOutputStream(new File("f:/hadoop-2.7.2.tar.gz.part2")); //4.流的拷贝 inputStream.seek(1024 * 1024 * 128); IOUtils.copyBytes(inputStream, outputStream, configuration); //5.关闭资源 IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); fileSystem.close(); System.out.println("over"); }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Hadoop——HDFS的Shell操作
下一篇:深入解析kafka&nbsp;架构原理
相关文章

 发表评论

暂时没有评论,来抢沙发吧~