2022-05-12 HDFS Java API使用

网友投稿 285 2022-11-19

2022-05-12 HDFS Java API使用

舆情数据上报案例

1.在HDFS上有一个目录,相当于企业中的数据中转站,会有源源不断的数据存储进去2.其中有些类型的数据是我们业务相关的舆情数据3.要求使用程序能够遍历数据源目录,找出符合需求的数据,上传移动到指定的HDFS目录下。

Step1:实现思路

使用Google Option解析命令行参数。读取要采集的数据目录,生成上传任务,上传任务包含一个任务文件,该文件包含了要上传哪些文件到HDFS上。执行任务,读取要上传的任务文件,挨个将任务文件中的文件上传到HDFS。上传中、上传完毕需要给任务文件添加特别的标识(willdoing copy done)。

Step2:​工程环境搭建

2.1添加maven依赖

cn.itcast.sentiment_upload.Entrance

2.2添加maven依赖 在test/java目录中创建下面包结构

包名

说明

cn.itcast.sentiment_upload.arg

处理命令行参数

cn.itcast.sentiment_upload.dfs

存放操作HDFS的工具类

cn.itcast.sentiment_upload.task

处理文件上传任务

Step3:开发舆情上报程序参数解析

使用GoogleOption创建参数实体类在cn.itcast.sentiment_upload.arg包下创建一个SentimentOptions类,并从OptionsBase继承,定义以下几个参数

帮助,可以显示命令的帮助信息 help h 默认参数要采集数据的位置 source s生成待上传的临时目录 pending_dir p  生成要上传到的HDFS路径 output o

package cn.itcast.sentiment_upload.arg;import com.google.devtools.common.options.Option;import com.google.devtools.common.options.OptionsBase;public class SentimentOptions extends OptionsBase { /** * 帮助参数 */ @Option( name = "help", abbrev = 'h', help = "打印帮助信息!", defaultValue = "true" ) public boolean help; /** * 采集数据参数 */ @Option( name = "source", abbrev = 's', help = "要采集数据的位置", defaultValue = "" ) public String sourceDir; /** * 生成待上传的待上传目录参数 */ @Option( name = "pending_dir", abbrev = 'p', help = "生成待上传的待上传目录", defaultValue = "/tmp/pending/sentiment" ) public String pendingDir; /** * 生成要上传的HDFS路径参数 */ @Option( name = "output", abbrev = 'o', help = "生成要上传的HDFS路径", defaultValue = "" ) public String output;}

Step3:开发舆情上报程序参数解析

在main方法中解析参数

package cn.itcast.sentiment_upload;/** * 注意: 导包的时候不要导错了 * import java.util.logging.LogManager; * import java.util.logging.Logger; */// 正确的包import cn.itcast.sentiment_upload.arg.SentimentOptions;import cn.itcast.sentiment_upload.task.TaskMgr;import com.google.devtools.common.options.OptionsParser;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import java.util.Collections;public class Entrance { // 创建一个Logger protected static final Logger Logger = LogManager.getLogger(Entrance.class.getName()); public static void main(String[] args) { // 解析命令行参数 OptionsParser parser = OptionsParser.newOptionsParser(SentimentOptions.class); parser.parseAndExitUponError(args); SentimentOptions options = parser.getOptions(SentimentOptions.class); // 判断参数如果为空,则打印帮助信息 if (options.sourceDir.isEmpty() || options.output.isEmpty()) { printUsage(parser); return; } Logger.info("舆情上报程序启动..."); TaskMgr taskMgr = new TaskMgr(); // 1. 生成上传任务 Logger.info("正在生成上传任务..."); taskMgr.genTask(options); // 2. 执行上传任务 Logger.info("正在上报数据到HFDS"); taskMgr.work(options); Logger.info("DONE"); } private static void printUsage(OptionsParser parser) { System.out.println("Usage: java -jar sentiment.jar OPTIONS"); System.out.println(parser.describeOptions(Collections.emptyMap(), OptionsParser.HelpVerbosity.LONG)); }}

Step4:实现生成数据采集任务

在cn.itcast.sentiment_upload.task包下创建TaskMgr源文件。先实现生成数据上报任务。 实现步骤:  1.判断原始数据目录是否存在  2.读取原始数据目录下的所有文件 3.判断待上传目录是否存在,不存在则创建一个  4.创建任务目录(目录名称:task_年月日时分秒_任务状态)  5.遍历待上传的文件,在待上传目录生成一个willDoing文件  6.将待移动的文件添加到willDoing文件中

package cn.itcast.sentiment_upload.task;import cn.itcast.sentiment_upload.arg.SentimentOptions;import cn.itcast.sentiment_upload.dfs.HDFSMgr;import cn.itcast.sentiment_upload.dfs.HDFSMgrImpl;import org.apache.commons.io.FileUtils;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;/** * 任务操作 */public class TaskMgr { protected static Logger Logger = LogManager.getLogger(TaskMgr.class.getName()); protected static final String COPY_STATUS = "_COPY"; protected static final String DONE_STATUS = "_DONE"; private HDFSMgr hdfsUtil; public TaskMgr() { hdfsUtil = new HDFSMgrImpl(); } /** * 生成待上传目录 * 1.判断原始数据目录是否存在 * 2.读取原始数据目录下的所有文件 * 3.判断待上传目录是否存在,不存在则创建一个 * 4.创建任务目录(目录名称:task_年月日时分秒_任务状态) * 5.遍历待上传的文件,在待上传目录生成一个willDoing文件 * 6.将待移动的文件添加到willDoing文件中 */ public void genTask(SentimentOptions options) { //1.判断原始数据目录是否存在 //通过File类 加载原始目录 File sourceDir = new File(options.sourceDir); if (!sourceDir.exists()) { //String.format()格式化输出错误错误日志 String errorMsg = String.format("%s 要采集的原始数据目录不存在.", options.sourceDir); Logger.error(errorMsg); throw new RuntimeException(errorMsg); } /* public File[] listFiles​(FilenameFilter filter)返回一个抽象路径名数组,表示由此抽象路径名表示的满足指定过滤器的目录中的文件和目录。 此方法的行为与listFiles()方法的行为相同,但返回的数组中的路径名必须满足过滤器。 如果给定的filter是null那么所有的路径名都被接受。 否则,如果且仅当值为true的过滤器的FilenameFilter.accept(File, String)方法在此抽象路径名上被调用时,路径名满足过滤器,并且在其所指示的目录中调用文件或目录的名称。 */ /* public interface FilenameFilter boolean accept​(File dir,String name)测试指定文件是否应包含在文件列表中。 参数 dir - 找到该文件的目录。 name - 文件的名称。 结果 true当且仅当名称应包含在文件列表中时; 否则为false 。 */ //2.读取原始数据目录下的所有文件 File[] allSourceDataFile = sourceDir.listFiles(f -> { // 判断文件格式是否以 weibo_data_ 开头 String fileName = f.getName(); if (fileName.startsWith("weibo_data_")) { return true; } return false; }); // 3.判断待上传目录是否存在,不存在则创建一个 File tempDir = new File(options.pendingDir); if (!tempDir.exists()) { try { //FileUtils.forceMkdirParent(tempDir); 强行创建父级目录 FileUtils.forceMkdirParent(tempDir); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage()); } } //时间日期格式化 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); //当对字符串进行修改的时候,需要使用StringBuilder类的对象能够被多次的修改,并且不产生新的未使用对象。 StringBuilder stringBuilder = new StringBuilder(); //4.创建任务目录(目录名称:task_年月日时分秒_任务状态 File taskDir = null; // 判断数据文件是否为空 if (allSourceDataFile != null && allSourceDataFile.length > 0) { taskDir = new File(tempDir, String.format("task_%s", sdf.format(new Date()))); taskDir.mkdir(); } else { return; } // 5.遍历待上传的文件,在待上传目录生成一个willDoing文件 for (File dataFile : allSourceDataFile) { try { // new File(String parent,String child) 注意:parent目录必须存在,否则报异常 //String类的format()方法用于创建格式化的字符串以及连接多个字符串对象 File destFile = new File(taskDir, dataFile.getName()); FileUtils.moveFile(dataFile, destFile); // 将文件的绝对路径保存下来 stringBuilder.append(destFile.getAbsoluteFile() + "\n"); } catch (IOException e) { Logger.error(e.getMessage(), e); } } // 6.遍历待上传的文件,在待上传目录生成一个willDoing文件 try { String taskName = String.format("willDoing_%s", sdf.format(new Date())); FileUtils.writeStringToFile(new File(tempDir, taskName) , stringBuilder.toString() , "utf-8"); } catch (IOException e) { Logger.error(e.getMessage(), e); } } /** * 处理任务 * a)将任务文件修改为_COPY,表示正在处理中 * b)获取任务的日期 * c)判断HDFS目标上传目录是否存在,不存在则创建 * d)读取任务文件 * e)按照换行符切分 * f)上传每一个文件,调用HDFSUtils进行数据文件上传 * g)上传成功后,将_COPY后缀修改为_DONE */ public void work(SentimentOptions options) { // 3.1 读取待上传目录的willDoing任务文件,注意过滤COPY和DONE后的任务文件夹 File pendingDir = new File(options.pendingDir); File[] pendingTaskDir = pendingDir.listFiles(f -> { String taskName = f.getName(); // 文件是以willDoing开头的 if (!taskName.startsWith("willDoing")) return false; if (taskName.endsWith(COPY_STATUS) || taskName.endsWith(DONE_STATUS)) { return false; } else { return true; } }); // 3.2 遍历读取任务文件,开始上传 for (File pendingTask : pendingTaskDir) { try { // 将任务文件修改为_COPY,表示正在处理中 File copyTaskFile = new File(pendingTask.getAbsolutePath() + "_" + COPY_STATUS); FileUtils.moveFile(pendingTask, copyTaskFile); // 获取任务的日期 String taskDate = pendingTask.getName().split("_")[1]; String dataPathInHDFS = options.output + String.format("/%s", taskDate); // 判断HDFS目标上传目录是否存在,不存在则创建 hdfsUtil.mkdir(dataPathInHDFS); // 读取任务文件 String tasks = FileUtils.readFileToString(copyTaskFile, "utf-8"); // 按照换行符切分 String[] taskArray = tasks.split("\n"); // 上传每一个文件 for (String task : taskArray) { // 调用HDFSUtils进行数据文件上传 hdfsUtil.put(task, dataPathInHDFS); } // 上传成功后,将_COPY后缀修改为_DONE File doneTaskFile = new File(pendingTask.getAbsolutePath() + "_" + DONE_STATUS); FileUtils.moveFile(copyTaskFile, doneTaskFile); } catch (IOException e) { Logger.error(e.getMessage(), e); } } }}

Step5:实现执行数据上报任务

1.读取待上传目录的willDoing任务文件,注意过滤COPY和DONE后的任务文件夹2.遍历读取任务文件,开始上传将任务文件修改为_COPY,表示正在处理中  获取任务的日期  判断HDFS目标上传目录是否存在,不存在则创建  读取任务文件  按照换行符切分  上传每一个文件,调用HDFSUtils进行数据文件上传  上传成功后,将_COPY后缀修改为_DONE

Step5:编写HDFS接口和类,项目打包

shade插件可以将所有的jar打到一个jar包中。

package cn.itcast.sentiment_upload.dfs;import java.util.List;/** * HFDS操作接口 */public interface HDFSMgr { /** * 读取某个目录下的所有文件 * * @param recursion 是否递归读取 */ List ls(String path, boolean recursion); /** * 上传文件到指定位置 * @param src 原始文件 * @param dest 目标位置 */ void put(String src, String dest); /** * 从HDFS上下载文件到本地 * @param src HDFS上的路径 * @param destLocal 本地位置 */ void get(String src, String destLocal); /** * 创建目录 * @param path 目标路径 */ void mkdir(String path); /** * 关闭操作HDFS的FileSystem */ void close();}

package cn.itcast.sentiment_upload.dfs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import java.io.IOException;import java.util.ArrayList;import java.util.List;/** * HDFS操作类 */public class HDFSMgrImpl implements HDFSMgr { // log4j配置文件 protected static Logger Logger = LogManager.getLogger(HDFSMgrImpl.class.getName()); private Configuration configuration; private FileSystem fileSystem; public HDFSMgrImpl() { try { configuration = new Configuration(); fileSystem = FileSystem.get(configuration); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } } /** * 读文件 * @param path * @param recursion 是否递归读取 * @return */ @Override public List ls(String path, boolean recursion) { try { // 指定遍历某个HDFS路径 RemoteIterator iterator = fileSystem.listFiles(new Path(path), recursion); ArrayList fileList = new ArrayList<>(); while(iterator.hasNext()) { LocatedFileStatus fileStatus = iterator.next(); // 获取文件的路径 fileList.add(fileStatus.getPath().toString()); } return fileList; } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } } /** * 上传文件到HDFS */ @Override public void put(String src, String dest) { try { // 从本地文件中上传文件到HDFS fileSystem.copyFromLocalFile(false, true, new Path(src), new Path(dest)); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } } @Override public void get(String src, String destLocal) { try { fileSystem.copyToLocalFile(new Path(src), new Path(destLocal)); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } } /** * 创建文件夹 */ @Override public void mkdir(String path) { try { // 判断文件夹是否存在 if (fileSystem.exists(new Path(path))) { return; } // 在HDFS中创建目录 fileSystem.mkdirs(new Path(path)); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } } /** * 关闭文件系统 */ @Override public void close() { try { fileSystem.close(); } catch (IOException e) { Logger.error(e.getMessage(), e); throw new RuntimeException(e); } }}

Step6:编写shell脚本

创建一个目录,使用以下shell脚本来驱动jar包执行。

#!/bin/bashexport SENTIMENT_HOME=/root/sentiment_uploadexport JAVA_HOME=/export/server/jdk1.8.0_241export JAVA_CMD="${JAVA_HOME}/bin/java"export JAVA_OPS="-jar ${SENTIMENT_HOME}/sentiment_upload-1.0-SNAPSHOT-jar-with-dependencies.jar" SOURCE_DIR=$1PENDING_DIR=$2OUTPUT_DIR=$3 if [ ! $SOURCE_DIR ] || [ ! $OUTPUT_DIR ]; then ${JAVA_CMD} ${JAVA_OPS} -h exit;fi if [ ! $PENDING_DIR ] ; then ${JAVA_CMD} ${JAVA_OPS} -s $SOURCE_DIR -o $OUTPUT_DIR exit;fi ${JAVA_CMD} ${JAVA_OPS} -s $SOURCE_DIR -p ${PENDING_DIR} -o $OUTPUT_DIR

Step7:CentOS创建工作目录,上传jar包

[root@node2 sentiment_upload]# ll /root/sentiment_upload/total 57744drwxr-xr-x. 2 root root 6 Apr 29 16:19 outdrwxr-xr-x. 2 root root 6 Apr 30 11:04 pending-rw-r--r--. 1 root root 59122973 May 12 2022 sentiment_upload-1.0-SNAPSHOT-jar-with-dependencies.jar-rw-r--r--. 1 root root 542 May 11 2022 sentiment_upload.shdrwxr-xr-x. 2 root root 67 Apr 30 11:03 source[root@node2 sentiment_upload]# ll source/total 350000-rw-r--r--. 1 root root 2 Apr 29 16:38 1.txt-rw-r--r--. 1 root root 313492319 Jan 17 2021 weibo_data_1.txt-rw-r--r--. 1 root root 44896346 Jan 17 2021 weibo_data_2.txt

Step8:测试结果

[root@node2 sentiment_upload]# lltotal 57748drwxr-xr-x. 4 root root 50 Apr 30 11:47 outdrwxr-xr-x. 3 root root 71 Apr 30 11:47 pending-rw-r--r--. 1 root root 59125913 May 12 2022 sentiment_upload-1.0-SNAPSHOT-jar-with-dependencies.jar-rw-r--r--. 1 root root 549 Apr 30 11:18 sentiment_upload.shdrwxr-xr-x. 2 root root 67 Apr 30 11:53 source[root@node2 sentiment_upload]# sh sentiment_upload.sh /root/sentiment_upload/source /root/sentiment_upload/pending /root/sentiment_upload/out2022-04-30 11:53:55,494 [main] INFO sentiment_upload.Entrance (Entrance.java:main(35)) - 舆情上报程序启动...2022-04-30 11:53:56,238 [main] WARN util.NativeCodeLoader (NativeCodeLoader.java:(60)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable2022-04-30 11:53:56,611 [main] INFO sentiment_upload.Entrance (Entrance.java:main(39)) - 正在生成上传任务...2022-04-30 11:53:56,653 [main] INFO sentiment_upload.Entrance (Entrance.java:main(42)) - 正在上报数据到HFDS2022-04-30 11:54:10,526 [main] INFO sentiment_upload.Entrance (Entrance.java:main(44)) - DONE[root@node2 sentiment_upload]# ll pending/total 8drwxr-xr-x. 2 root root 54 Apr 30 11:47 task_20220430110418drwxr-xr-x. 2 root root 54 Apr 30 11:53 task_20220430115356-rw-r--r--. 1 root root 94 Apr 30 11:47 willDoing_20220430110418__COPY-rw-r--r--. 1 root root 136 Apr 30 11:53 willDoing_20220430115356__DONE[root@node2 sentiment_upload]# ll source/total 4-rw-r--r--. 1 root root 2 Apr 29 16:38 1.txt

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:大数据笔记(三):HDFS-伪分布式模式
下一篇:瑞萨电子推出全球首款用于汽车的28nm工艺的集成闪存微控制器
相关文章

 发表评论

暂时没有评论,来抢沙发吧~