c语言sscanf函数的用法是什么
252
2022-11-20
Hadoop之——MapReduce实战(二)
MapReduce的老api写法
import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;/** * author liuyazhuang */ public class HelloWorldApp { public static void main(String[] args) throws Exception { final JobConf job = new JobConf(HelloWorldApp.class); FileInputFormat.setInputPaths(job, new Path("/input")); FileOutputFormat.setOutputPath(job, new Path("/output")); JobClient.runJob(job); }}
MapReduce获取命令行参数
/** * author liuyazhuang */public class CacheFileApp extends Configured implements Tool{ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new CacheFileApp(), args); } @Override public int run(String[] args) throws Exception { //.............................. Path in = new Path(args[0]); Path out = new Path(args[1]); //.............................. }}
计数器
hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。
内置计数器(MapReduce相关、文件系统相关和作业调度相关)
...
也可以通过hello me的计数器信息
Counters: 19 File Output Format Counters Bytes Written=19 //reduce输出到hdfs的字节数 FileSystemCounters FILE_BYTES_READ=481 HDFS_BYTES_READ=38 FILE_BYTES_WRITTEN=81316 HDFS_BYTES_WRITTEN=19 File Input Format Counters Bytes Read=19 //map从hdfs读取的字节数 Map-Reduce Framework Map output materialized bytes=49 Map input records=2 //map读入的记录行数 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=35 Total committed heap usage (bytes)=266469376 SPLIT_RAW_BYTES=105 Combine input records=0 Reduce input records=4 //reduce从map端接收的记录行数 Reduce input groups=3 //reduce函数接收的key数量,即归并后的k2数量 Combine output records=0 Reduce output records=3 //reduce输出的记录行数 Map output records=4 //map输出的记录行数
自定义计数器与实现
Context类调用方法getCounter()
context.getCounter(Enum enum)
context.getCounter(String groupName,StringcounterName)
计数器操作
counter.setValue(long value);//设置初始值
counter.increment(longincr);//增加计数
Combiners编程
每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
Partitioner编程
Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
2. HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE)% numReduceTasks,得到当前的目的reducer。
3. (例子以jar形式运行)
排序和分组
1在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
2分组时也是按照k2进行比较的
/** * 为什么有这个类?因为mapper端比较时只能比较k2,不能比较v2.如果想让v2参与比较,必须参与到k2角色中。自定义该类,包含原来的k2和v2 * 在哪里调用本来中的compareTo方法,在1.4中调用 */ static class TwoInt implements WritableComparable
/** * 分组时比较采用的比较器。比较的是原来的k2 * */ static class GroupComparator implements RawComparator
Shuffle
每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。写磁盘前,要partition,sort。如果有combiner,combine排序后数据。等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。Reducer通过Http方式得到输出文件的分区。TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。排序阶段合并map输出。然后走Reduce阶段
hadoop的压缩codec
Codec为压缩,解压缩的算法实现。在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:
MapReduce的输出进行压缩
输出的压缩属性
MapReduce常见算法
单词计数数据去重排序Top K选择投影分组多表连接单表关联
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~