Hadoop大数据——mapreduce的排序机制之total排序

网友投稿 230 2022-11-24

Hadoop大数据——mapreduce的排序机制之total排序

mapreduce的排序机制之total排序 (1)设置一个reduce task ,全局有序,但是并发度太低,单节点负载太大 (2)设置分区段partitioner,设置相应数量的reduce task,可以实现全局有序,但难以避免数据分布不均匀——数据倾斜问题,有些reduce task负载过大,而有些则过小; (3)可以通过编写一个job来统计数据分布规律,获取合适的区段划分,然后用分区段partitioner来实现排序;但是这样需要另外编写一个job对整个数据集运算,比较费事 (4)利用hadoop自带的取样器,来对数据集取样并划分区段,然后利用hadoop自带的TotalOrderPartitioner分区来实现全局排序 /** * 全排序示例 * @author zhangxueliang * */ public class TotalSort { static class TotalSortMapper extends Mapper { OrderBean bean = new OrderBean(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { // String line = value.toString(); // String[] fields = line.split("\t"); // bean.set(fields[0], Double.parseDouble(fields[1])); context.write(key, value); } } static class TotalSortReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TotalSort.class); job.setMapperClass(TotalSortMapper.class); job.setReducerClass(TotalSortReducer.class); // job.setOutputKeyClass(OrderBean.class); // job.setOutputValueClass(NullWritable.class); //用来读取sequence源文件的输入组件 job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // job.setPartitionerClass(RangePartitioner.class); //分区的逻辑使用的hadoop自带的全局排序分区组件 job.setPartitionerClass(TotalOrderPartitioner.class); //系统自带的这个抽样器只能针对sequencefile抽样 RandomSampler randomSampler = new InputSampler.RandomSampler(0.1,100,10); InputSampler.writePartitionFile(job, randomSampler); //获取抽样器所产生的分区规划描述文件 Configuration conf2 = job.getConfiguration(); String partitionFile = TotalOrderPartitioner.getPartitionFile(conf2); //把分区描述规划文件分发到每一个task节点的本地 job.addCacheFile(new URI(partitionFile)); //设置若干并发的reduce task job.setNumReduceTasks(3); job.waitForCompletion(true); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:适合高温环境应用的AD7981数据采集系统设计
下一篇:Hadoop大数据——mapreduce的secondary排序机制
相关文章

 发表评论

暂时没有评论,来抢沙发吧~