Hadoop——分布式计算框架MapReduce实践案例

网友投稿 277 2022-11-19

Hadoop——分布式计算框架MapReduce实践案例

三、MapReduce案例实操

1、MapReduce核心编程思想

1)分布式的运算程序往往需要分成至少2个阶段

2)第一个阶段的maptask并发实例,完全并行运行,互不相干

3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出

4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行

2、MapReduce程序运行流程详解

MR程序具体运行步骤如下:

1)在MapReduce程序读取文件的输入目录上存放相应的文件。

2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。

3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。

4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。

5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。

6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算。

7)map()运算完毕后将KV对收集到maptask缓存。

8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件。

9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。

10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。

11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。

3、案例实践

新建Maven工程,在pom.xml文件中添加依赖

junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2

在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.1、WordCount案例

需求:在给定的文本文件中统计输出每一个单词出现的总次数

输入数据:

hello worldhadoopsparkhello worldhadoopsparkhello worldhadoopspark

代码:

1)定义一个mapper类

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * (1)用户自定义的Mapper要继承自己的父类 * (2)Mapper的输入数据是K-V对的形式(K-V的类型可自定义) * (3)Mapper中的业务逻辑写在map()方法中 * (4)Mapper的输出数据是K-V对的形式(K-V的类型可自定义) * (5)map()方法(maptask进程)对每一个调用一次 */public class WordcountMapper extends Mapper { /** * map()方法(maptask进程)对每一个调用一次 * * @param key : 数据的offset * @param value : 要处理的一行数据 * @param context : 上下文 * @throws IOException * @throws InterruptedException */ IntWritable v = new IntWritable(1); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、获取一行数据,将一行数据转化为String类型 String line = value.toString(); //2、切割 String[] words = line.split(" "); //3、循环写出 for (String word : words) { k.set(word); context.write(k, v); } }}

2)定义一个reducer类

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * (1)用户自定义的Reducer要继承自己的父类 * (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV * (3)Reducer的业务逻辑写在reduce()方法中 * (4)Reducetask进程对每一组相同k的组调用一次reduce()方法 */public class WordcountReduce extends Reducer { /** * Reducetask进程对每一组相同k的组调用一次reduce()方法 * * @param key : 单词 * @param values : 单词个数(1)的集合 * @param context : 上下文 * @throws IOException * @throws InterruptedException */ IntWritable v = new IntWritable(); int sum; @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //1、累加 sum = 0; for (IntWritable value : values) { sum += value.get(); } //2、写出 v.set(sum); context.write(key, v); }}

3)定义一个主类,用来描述job并提交job

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 相当于yarn客户端,负责提交MapReduce程序 */public class WordcountDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WordcountDriver.class); // 3 设置map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReduce.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置Reduce输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置job数据输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}

第一种情况:

设置好job数据输入和输出路径,直接在IDEA上运行代码,得到输出结果即可。

第二种情况:

在IDEA上打包成jar,上传到集群上运行

运行命令:

hadoop jar mapreduce-1.0-SNAPSHOT.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/wc/input/hello.txt /user/atguigu/wc/outputhadoop jar jar包名称 Driver类 数据输入路径 数据输出路径

得到输出结果即可。

3.2、数据清洗案例

需求:去除日志中字段长度小于等于11的日志。

输入数据:

​​数据源​​

期望输出数据:

每行字段长度都大于11。(实际通过计数器计数:符合要求的为true,不符合的为false)

需求分析:

需要在Map阶段对输入的数据根据规则进行过滤清洗。

代码:

1)编写LogParseMapper类

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogParseMapper extends Mapper { Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); boolean result = LogParse(line, context); if (!result) { return; } k.set(line); context.write(k, NullWritable.get()); } private boolean LogParse(String line, Context context) { String[] fields = line.split(" "); if (fields.length > 11) { context.getCounter("map", "true").increment(1); return true; } context.getCounter("map", "false").increment(1); return false; }}

2)编写LogParseDriver类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogParseDriver { public static void main(String[] args) throws Exception { //输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "path1", "path2" }; //连接客户端 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //提交job的class job.setJarByClass(LogParseDriver.class); //提交map的class job.setMapperClass(LogParseMapper.class); //map最后输出状态 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //数据输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //map端join不需要reduce极端,设置reducetask数量为0 job.setNumReduceTasks(0); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}

输出:

设置好输入输出路径,运行代码即可得到结果

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Hadoop——分布式文件管理系统HDFS
下一篇:一文秒懂 kafka HA(高可用)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~