c语言sscanf函数的用法是什么
277
2022-11-19
Hadoop——分布式计算框架MapReduce实践案例
三、MapReduce案例实操
1、MapReduce核心编程思想
1)分布式的运算程序往往需要分成至少2个阶段
2)第一个阶段的maptask并发实例,完全并行运行,互不相干
3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出
4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行
2、MapReduce程序运行流程详解
MR程序具体运行步骤如下:
1)在MapReduce程序读取文件的输入目录上存放相应的文件。
2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算。
7)map()运算完毕后将KV对收集到maptask缓存。
8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件。
9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。
3、案例实践
新建Maven工程,在pom.xml文件中添加依赖
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3.1、WordCount案例
需求:在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:
hello worldhadoopsparkhello worldhadoopsparkhello worldhadoopspark
代码:
1)定义一个mapper类
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * (1)用户自定义的Mapper要继承自己的父类 * (2)Mapper的输入数据是K-V对的形式(K-V的类型可自定义) * (3)Mapper中的业务逻辑写在map()方法中 * (4)Mapper的输出数据是K-V对的形式(K-V的类型可自定义) * (5)map()方法(maptask进程)对每一个
2)定义一个reducer类
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * (1)用户自定义的Reducer要继承自己的父类 * (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV * (3)Reducer的业务逻辑写在reduce()方法中 * (4)Reducetask进程对每一组相同k的
3)定义一个主类,用来描述job并提交job
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 相当于yarn客户端,负责提交MapReduce程序 */public class WordcountDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WordcountDriver.class); // 3 设置map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReduce.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置Reduce输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置job数据输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
第一种情况:
设置好job数据输入和输出路径,直接在IDEA上运行代码,得到输出结果即可。
第二种情况:
在IDEA上打包成jar,上传到集群上运行
运行命令:
hadoop jar mapreduce-1.0-SNAPSHOT.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/wc/input/hello.txt /user/atguigu/wc/outputhadoop jar jar包名称 Driver类 数据输入路径 数据输出路径
得到输出结果即可。
3.2、数据清洗案例
需求:去除日志中字段长度小于等于11的日志。
输入数据:
数据源
期望输出数据:
每行字段长度都大于11。(实际通过计数器计数:符合要求的为true,不符合的为false)
需求分析:
需要在Map阶段对输入的数据根据规则进行过滤清洗。
代码:
1)编写LogParseMapper类
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogParseMapper extends Mapper
2)编写LogParseDriver类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogParseDriver { public static void main(String[] args) throws Exception { //输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "path1", "path2" }; //连接客户端 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //提交job的class job.setJarByClass(LogParseDriver.class); //提交map的class job.setMapperClass(LogParseMapper.class); //map最后输出状态 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //数据输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //map端join不需要reduce极端,设置reducetask数量为0 job.setNumReduceTasks(0); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
输出:
设置好输入输出路径,运行代码即可得到结果
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~