c语言sscanf函数的用法是什么
250
2022-11-22
0009 - 基于MapReduce的应用案例
1 - MapReduce词频统计案例
1.1 - 样本数据
这是一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。
[root@hadoop-01 ~]# cat input.txt Spark HBase Azkaban Flume Hive Flink Storm Hadoop HBase Spark Flink Presto Kudu Azkaban HBase Storm Presto Kafka HBase Hadoop Hive Flink Kudu HBase Flink Hive Storm Hive Flink Hadoop Flume HBase Hive Kudu Zookeeper Hadoop Spark HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Kudu Hive Flink Hadoop Kafka HBase Hive Presto Zookeeper Presto Kudu Hadoop Kafka Zookeeper Hadoop Flume Azkaban Kudu Presto Kafka Zookeeper Kafka Flume [root@hadoop-01 ~]# hdfs dfs -mkdir -p /tmp/wordcount/input [root@hadoop-01 ~]# hdfs dfs -put -f input.txt /tmp/wordcount/input/input.txt
项目完整源码下载地址: WordCountDemo
1.2 - 项目依赖
进行 MapReduce 编程,需要导入 hadoop-client 的依赖版本:
1.3 - WordCountMapper
将 input.txt 文件的每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable 接口。
在 Map 中将每行数据按照分隔符进行拆分:
/** * Object : Mapping 输入文件的内容 * Text : Mapping 输入的每一行的数据 * Text : Mapping 输出 key 的类型 * IntWritable : Mapping 输出 value 的类型 */ public class WordCountMapper extends Mapper
1.4 - WordCountReducer
在 Reduce 中进行单词出现次数的统计:
/**
* Text : Mapping 输入的 key 的类型
* IntWritable : Mapping 输入的 value 的类型
* Text : Reducing 输出的 key 的类型
* IntWritable : Reducing 输出的 value 的类型
*/
public class WordCountReducer extends Reducer
1.5 - WordCountApp
打包 jar 到集群使用 hadoop 命令提交作业示例:
public class WordCountApp {
// 1、使用硬编码,显示参数,实际开发中可以通过外部传参
private static final String HDFS_URL = "hdfs://172.16.1.2:8020";
private static final String HADOOP_USER_NAME = "hdfs";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 文件的输入路径和输出路径由外部传参指定
if (args.length < 2) {
System.err.println("Usage: wordcount
提示: 如果不设置 Mapper 操作的输出类型,则默认和 Reducer 操作输出的类型相同。
1.6 - 提交到 MapReduce 集群运行
可以在本机配置 Hadoop 开发环境,直接在 IDE 中启动进行测试。本案例打包提交到服务器中运行。由于本项目没有使用除 Hadoop 外的第三方依赖,因此直接打包即可:
或者使用 mvn 命令打包:
[root@hadoop-01 ~]# mvn package -DskipTests
2、提交作业
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountApp /tmp/wordcount/input /tmp/wordcount/output
4、查看 HDFS 上统计结果
# 查看生成目录 [root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output # 查看统计结果 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000
提示: 每个作业的 Reduce 任务的默认个数为 1(通过 mapreduce.job.reduces 设置)。
每个 Reduce 任务都会产生一个输出文件,如果设置 Reduce 个数为 3,那么 Map 输出数据会被分成 3 份,Reduce 输出的 part-r-00000 文件也会有 3 个。
2 - 词频统计案例之Combiner
2.1 - 代码实现
如果需要使用 Combiner 功能,只要在打包 jar 文件时,添加下面一行代码即可:
// 5、设置Combiner job.setCombinerClass(WordCountReducer.class);
2.2 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerApp /tmp/wordcount/input /tmp/wordcount/output
加入 Combiner 功能后,统计结果不会发生变化,但是可以从打印的日志看出 Combiner 的效果:
本案例只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 Combiner 功能后,Combine input records 值由 70 降低为 12(样本中单词种类就只有 12 种),在本案例中 Combiner 能降低需要传输的数据量。
3 - 词频统计案例之Partitioner
3.1 - 默认的 Partitioner
假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner。
MapReduce 默认的分类规则:在构建 job 的时候,如果不指定,则默认使用的是 HashPartitioner:对 key 值进行哈希散列,并对 numReduceTasks 取余。其实现如下:
public class HashPartitioner
3.2 - 自定义 Partitioner
使用 Partitioner 自定义分类规则,按照单词进行分类:
public class CustomPartitioner extends Partitioner
在构建 job 时,指定使用自定义的分类规则,并设置 reduce 的个数:
// 6、设置自定义 Partitioner 规则 job.setPartitionerClass(CustomPartitioner.class); // 7、设置 Reduce 个数 job.setNumReduceTasks(12);
3.3 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerPartitionerApp /tmp/wordcount/input /tmp/wordcount/output
执行结果如下,分别生成 12 个文件,每个文件中为对应单词的统计结果:
[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output/ Found 13 items -rw-r--r-- 3 hdfs supergroup 0 2021-09-03 15:30 /tmp/wordcount/output/_SUCCESS -rw-r--r-- 3 hdfs supergroup 10 2021-09-03 15:30 /tmp/wordcount/output/part-r-00000 -rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00001 -rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00002 -rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00003 -rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00004 -rw-r--r-- 3 hdfs supergroup 7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00005 -rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00006 -rw-r--r-- 3 hdfs supergroup 7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00007 -rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00008 -rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00009 -rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00010 -rw-r--r-- 3 hdfs supergroup 12 2021-09-03 15:30 /tmp/wordcount/output/part-r-00011 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000 Azkaban 3 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00001 Flink 8 [root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00002 Flume 4
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~