JAVA API操作小文件合并至HDFS(笔记)

网友投稿 304 2022-11-26

JAVA API操作小文件合并至HDFS(笔记)

相关文件请自行创建!!!

package com.hadoop.hdfs;

import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FileUtil;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.IOUtils;/**

合并小文件至 HDFS */public class MergeSmallFilesToHDFS {private static FileSystem fs = null;private static FileSystem local = null; public static void main(String[] args) throws IOException,URISyntaxException {list();} /** 数据集合并,并上传至HDFS throws IOExceptionthrows URISyntaxException/public static void list() throws IOException, URISyntaxException {// 读取hadoop文件系统的配置Configuration conf = new Configuration();//文件系统访问接口,注意:hdfs://master:9000修改成自己的HDFS地址URI uri = new URI("hdfs://master:9000");//创建FileSystem对象fs = FileSystem.get(uri, conf);// 获得本地文件系统local = FileSystem.getLocal(conf);//过滤目录下的 svn文件,注意:文件路径E://Hadoop/73/修改成自己的路径FileStatus[] dirstatus = local.globStatus(new Path("E://Hadoop/73/"),new RegexExcludePathFilter("^.svn$"));//获取73目录下的所有文件路径Path[] dirs = FileUtil.stat2Paths(dirstatus);FSDataOutputStream out = null;FSDataInputStream in = null;for (Path dir : dirs) {//2019-10-31String fileName = dir.getName().replace("-", "");//文件名称//只接受日期目录下的.txt文件FileStatus[] localStatus = local.globStatus(new Path(dir+"/"),new RegexAcceptPathFilter("^.txt$"));// 获得日期目录下的所有文件Path[] listedPaths = FileUtil.stat2Paths(localStatus);//输出路径,注意:hdfs://master:9000/20191031/修改成自己的HDFS目录地址Path block = new Path("hdfs://master:9000/20191031/"+ fileName + ".txt");System.out.println("合并后的文件名称:"+fileName+".txt");// 打开输出流out = fs.create(block); for (Path p : listedPaths) {in = local.open(p);// 打开输入流IOUtils.copyBytes(in, out, 4096, false); // 复制数据// 关闭输入流in.close();}if (out != null) {// 关闭输出流out.close();}} } /** 过滤 regex 格式的文件 */public static class RegexExcludePathFilter implements PathFilter {private final String regex;public RegexExcludePathFilter(String regex) {this.regex = regex;} public boolean accept(Path path) {boolean flag = path.toString().matches(regex);return !flag;} } /** 接受 regex 格式的文件 */public static class RegexAcceptPathFilter implements PathFilter {private final String regex;public RegexAcceptPathFilter(String regex) {this.regex = regex;} @Overridepublic boolean accept(Path path) {boolean flag = path.toString().matches(regex);return flag;} }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于DSP芯片实现异步串行通信系统的软硬件设计
下一篇:linxu网络协议分析:IP协议、TCP协议、UDP协议
相关文章

 发表评论

暂时没有评论,来抢沙发吧~