Hadoop之MapReduce篇(一)

网友投稿 251 2022-11-20

Hadoop之MapReduce篇(一)

第1章 MapReduce 概述

1.MapReduce定义

MapReduce是一个分布式运行程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运行程序,并发运行在一个Hadoop集群上

2.MapReduce 优缺点

优点:

MapReduce 易于编程良好的扩展性高容错性适合 PB 级以上海量数据的离线处理

缺点:

不擅长实时计算不擅长流式计算不擅长 DAG(有向无环图)计算

3.MapReduce 进程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

MrAppMaster:负责整个程序的过程调度及状态协调。MapTask:负责 Map 阶段的整个数据处理流程。ReduceTask:负责 Reduce 阶段的整个数据处理流程。

4.常用数据序列化类型

Java 类型

Hadoop Writable 类型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

Null

NullWritable

5.MapRedece编码规范

1.Mapper阶段

用户自定义的Mapper要继承自己的父类Mapper的输入数据是KV对的形式(KV的类型可自定义)Mapper中的业务逻辑写在map()方法中Mapper的输出数据是KV对的形式(KV的类型可自定义)map()方法(MapTask进程)对每一个调用一次

2. Reducer阶段

用户自定义的Reducer要继承自己的父类Reducer的输入数据类型对应Mapper的输出数据类型,也是KVReducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是 封装了MapReduce程序相关运行参数的job对象

6.WordCount 案例实操

1.环境准备

创建 maven 工程,MapReduce在 pom.xml 文件中添加如下依赖

org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30

在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。

log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2.写码

1.编写 Mapper 类

package com.demo.mapreduce.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}}

2.编写 Reducer 类

package com.demo.mapreduce.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer{int sum;IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : values) {sum += count.get();}// 2 输出 v.set(sum);context.write(key,v);}}

3.编写 Driver 驱动类

package com.demo.mapreduce.wordcount;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取 job 对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 关联本 Driver 程序的 jarjob.setJarByClass(WordCountDriver.class);// 3 关联 Mapper 和 Reducer 的 jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置 Mapper 输出的 kv 类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出 kv 类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交 jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}

4.本地测试

需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖 ,在 IDEA/Eclipse 上运行程序

第2章 Hadoop 序列化

1.序列化概述

1.什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁 盘(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换 成内存中的对象。

2.为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能 由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机。

3.为什么不用 Java 的序列化

Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带 很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)。

4.Hadoop 序列化特点

紧凑 :高效使用存储空间。快速:读写数据的额外开销小。互操作:支持多语言的交互

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微软发布三款特定行业云产:涉及金融、制造和非营利组织
下一篇:java单机接口限流处理方案详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~