Hadoop之——MapReduce实战(二)

网友投稿 252 2022-11-20

Hadoop之——MapReduce实战(二)

MapReduce的老api写法

import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;/** * author liuyazhuang */ public class HelloWorldApp { public static void main(String[] args) throws Exception { final JobConf job = new JobConf(HelloWorldApp.class); FileInputFormat.setInputPaths(job, new Path("/input")); FileOutputFormat.setOutputPath(job, new Path("/output")); JobClient.runJob(job); }}

MapReduce获取命令行参数

/** * author liuyazhuang */public class CacheFileApp extends Configured implements Tool{ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new CacheFileApp(), args); } @Override public int run(String[] args) throws Exception { //.............................. Path in = new Path(args[0]); Path out = new Path(args[1]); //.............................. }}

计数器

hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。

内置计数器(MapReduce相关、文件系统相关和作业调度相关)

...

也可以通过hello me的计数器信息

Counters: 19 File Output Format Counters Bytes Written=19 //reduce输出到hdfs的字节数 FileSystemCounters FILE_BYTES_READ=481 HDFS_BYTES_READ=38 FILE_BYTES_WRITTEN=81316 HDFS_BYTES_WRITTEN=19 File Input Format Counters Bytes Read=19 //map从hdfs读取的字节数 Map-Reduce Framework Map output materialized bytes=49 Map input records=2 //map读入的记录行数 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=35 Total committed heap usage (bytes)=266469376 SPLIT_RAW_BYTES=105 Combine input records=0 Reduce input records=4 //reduce从map端接收的记录行数 Reduce input groups=3 //reduce函数接收的key数量,即归并后的k2数量 Combine output records=0 Reduce output records=3 //reduce输出的记录行数 Map output records=4 //map输出的记录行数

自定义计数器与实现

Context类调用方法getCounter()

context.getCounter(Enum enum)

context.getCounter(String groupName,StringcounterName)

计数器操作

counter.setValue(long value);//设置初始值

counter.increment(longincr);//增加计数

Combiners编程

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

Partitioner编程

Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

2.  HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE)% numReduceTasks,得到当前的目的reducer。

3.  (例子以jar形式运行)

排序和分组

1在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

2分组时也是按照k2进行比较的

/** * 为什么有这个类?因为mapper端比较时只能比较k2,不能比较v2.如果想让v2参与比较,必须参与到k2角色中。自定义该类,包含原来的k2和v2 * 在哪里调用本来中的compareTo方法,在1.4中调用 */ static class TwoInt implements WritableComparable{ long first; long second; public TwoInt(){} public TwoInt(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput arg0) throws IOException { this.first = arg0.readLong(); this.second = arg0.readLong(); } @Override public void write(DataOutput arg0) throws IOException { arg0.writeLong(first); arg0.writeLong(second); } @Override public int hashCode() { return (this.first+"").hashCode()+(this.second+"").hashCode(); } @Override public boolean equals(Object obj) { if(obj instanceof TwoInt){ TwoInt ob = (TwoInt)obj; return (this.first==ob.first && this.second==ob.second)?true:false; }else{ return false; } } @Override public int compareTo(TwoInt o) { if(this.first!=o.first){ return (int)(this.first-o.first); }else{ return (int)(this.second-o.second); } } }

/** * 分组时比较采用的比较器。比较的是原来的k2 * */ static class GroupComparator implements RawComparator{ @Override public int compare(TwoInt o1, TwoInt o2) { return (int)(o1.first-o2.first); } /** * arg0表示第一个字节数组 * arg1表示第一个字节数组的参与比较的开始位置 * arg3表示第二个字节数组 * arg4表示第二个字节数组的参与比较的开始位置 */ @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8); } }

Shuffle

每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。写磁盘前,要partition,sort。如果有combiner,combine排序后数据。等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。Reducer通过Http方式得到输出文件的分区。TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。排序阶段合并map输出。然后走Reduce阶段

hadoop的压缩codec

Codec为压缩,解压缩的算法实现。在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:

MapReduce的输出进行压缩

输出的压缩属性

MapReduce常见算法

单词计数数据去重排序Top K选择投影分组多表连接单表关联

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Hadoop之——以1.x版本和0.x版本分别实现单词统计功能
下一篇:基于FPGA的绝对式编码器通信接口设计
相关文章

 发表评论

暂时没有评论,来抢沙发吧~