Hadoop之——实现全排序

网友投稿 301 2022-11-20

Hadoop之——实现全排序

一般方法:

有一种方式可以实现Hadoop的全排序,那就是将多个Map阶段输出的排序中间结果全部输入到一个Reducer类中,这种方式的并行度不高,性能瓶颈也比较明显,无法发挥分布式计算的优势。

改进的方法:

如果将Map任务的输出结果拼接起来称为一个全局的结果文件,这样效率就会大大提升,但是这样的话,每个Map任务处理的数据必须是在一个连续的区间内(0-100, 101-200, 201-300),或者说每个Map任务负责一个分区的数据,这样才能保证输出的结果是全局有序的,这种方法也被成为多分区排序方法。

Hadoop自带的Partitioner有两种:一种是HashPartitioner,一种是TotalPartitioner,下面的示例中使用的是TotalPartitioner。这个类会为排序创建作业分区。难点在于如何选择创建分区的依据。这里,先采用Hadoop默认的抽样器进行抽样,根据其数据分布生成分区文件,避免数据倾斜造成的性能低下。

具体实现:

原始数据文件numbers.txt的内容如下:

5 94 23 84 93 75 61 33 2

pom.xml文件配置如下:

3.2.0 1.7.2 org.apache.hadoop hadoop-hdfs ${hadoop.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-log4j12 ${slf4j.version}

Java类代码如下:

/** * Copyright 2019-2999 the original author or authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package io.binghe.hadoop.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.InputSampler;import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @author binghe * @version 1.0.0 * @description Hadoop实现全排序 */public class AllSort extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new AllSort(), args); } public int run(String[] args) throws Exception { //文件输入路径 Path inputPath = new Path(args[0]); //结果输出路径 Path outputPath = new Path(args[1]); //分区文件路径 Path partitionFilePath = new Path(args[2]); //reduce数量 int reduceNumber = Integer.parseInt(args[3]); //RandomSampler第一个参数表示会被选中的概率,第二个参数是一个选取的样本数,第三个参数是最大读取InputSplit数 InputSampler.RandomSampler sampler = new InputSampler.RandomSampler(0.1, 100000, 10); Configuration conf = new Configuration(); //设置作业的分区文件零 TotalOrderPartitioner.setPartitionFile(conf, partitionFilePath); Job job = Job.getInstance(conf); job.setJobName("AllSort"); job.setJarByClass(AllSort.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(reduceNumber); //设置partition类 job.setPartitionerClass(TotalOrderPartitioner.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); //写入分区文件 InputSampler.writePartitionFile(job, sampler); return job.waitForCompletion(true) ? 0 : 1; }}

测试程序

将项目打包,并运行如下命令:

hadoop jar /home/hadoop/mykit-xxx.jar io.binghe.hadoop.sort.AllSort /data/input/numbers.txt /data/output/ /data/partition/ 1

命令说明:

hadoop jar:执行Hadoop程序的命令行关键字。/home/hadoop/mykit-xxx.jar:MapReduce程序所在Jar包的完整路径。io.binghe.hadoop.sort.AllSort:MapReduce的完整类名。/data/input/numbers.txt:原始数据文件的输入路径。/data/output/:结果数据输出路径。/data/partition/:执行MapReduce程序创建的分区目录。1:程序中接收的Reduce个数。

查看输出结果:

-bash-4.1$ hadoop fs -ls /dataFound 3 itemsdrwxr-xr-x - hadoop supergroup 0 2019-07-12 10:25 /data/inputdrwxr-xr-x - hadoop supergroup 0 2019-07-12 11:51 /data/output-rw-r--r-- 1 hadoop supergroup 129 2019-07-12 11:51 /data/partition-bash-4.1$ -bash-4.1$ hadoop fs -cat /data/output/part-r-000001 33 23 73 84 24 95 65 9-bash-4.1$

可以看到,分区操作创建了分区目录/data/partition, 最终的结果存放目录为/data/output,在文件part-r-00000中存放了最终的输出结果。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java BIO实现聊天程序
下一篇:气压传感器MS5611使用SPI接口的程序
相关文章

 发表评论

暂时没有评论,来抢沙发吧~