hadoop系列之MR的经典代码案例一

网友投稿 261 2022-11-23

hadoop系列之MR的经典代码案例一

七、MapReduce经典案例

1、网站分析案例1)分析

省份访问

procinceId  --> Key

1                  -->Value

数据库:

维度表

tb_provinve_info

provinveId

provinveName

provinveXxx

江苏省 -> 2098

上海市 -> 34563

2)程序

i.设置Mapper类和Map方法

ii.设置Reduce类和reduce方法

iii.设置run方法

iv.设置main方法

v.设置计数器(设置在mapper类中)

3)导出jar包运行

i.eclipse打包

iv.YARN命令运行

$ bin/yarn jar  .....

2、二次排序

1)IntPair类

package com.hadoop.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; public class IntPair implements WritableComparable {    private IntWritable first;    private IntWritable second;    public void set(IntWritable first, IntWritable second) {        this.first = first;        this.second = second;    }    //注意:需要添加无参的构造方法,否则反射时会报错。    public IntPair() {        set(new IntWritable(), new IntWritable());    }    public IntPair(int first, int second) {        set(new IntWritable(first), new IntWritable(second));    }    public IntPair(IntWritable first, IntWritable second) {        set(first, second);    }    public IntWritable getFirst() {        return first;    }    public void setFirst(IntWritable first) {        this.first = first;    }    public IntWritable getSecond() {        return second;    }    public void setSecond(IntWritable second) {        this.second = second;    }    @Override    public void write(DataOutput out) throws IOException {        first.write(out);        second.write(out);    }    @Override    public void readFields(DataInput in) throws IOException {        first.readFields(in);        second.readFields(in);    }    @Override    public int hashCode() {        return first.hashCode() * 163 + second.hashCode();    }    @Override    public boolean equals(Object o) {        if (o instanceof IntPair) {            IntPair tp = (IntPair) o;            return first.equals(tp.first) && second.equals(tp.second);        }        return false;    }    @Override    public String toString() {        return first + "\t" + second;    }    @Override    public int compareTo(IntPair tp) {        int cmp = first.compareTo(tp.first);        if (cmp != 0) {            return cmp;        }        return second.compareTo(tp.second);    } }

2)Secondary类

package com.hadoop.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondarySort {    static class TheMapper extends Mapper {        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String[] fields = value.toString().split("\t");            int field1 = Integer.parseInt(fields[0]);            int field2 = Integer.parseInt(fields[1]);              context.write(new IntPair(field1,field2), NullWritable.get());        }    }        static class TheReducer extends Reducer {        //private static final Text SEPARATOR = new Text("------------------------------------------------");        @Override        protected void reduce(IntPair key, Iterable values, Context context)                throws IOException, InterruptedException {            context.write(key, NullWritable.get());        }    }    public static class FirstPartitioner extends Partitioner {        @Override        public int getPartition(IntPair key, NullWritable value, int numPartitions) {            return Math.abs(key.getFirst().get()) % numPartitions;        }            }        //如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序    public static class KeyComparator extends WritableComparator {        //无参构造器必须加上,否则报错。        protected KeyComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            IntPair ip1 = (IntPair) a;            IntPair ip2 = (IntPair) b;            //第一列按升序排序            int cmp = ip1.getFirst().compareTo(ip2.getFirst());            if (cmp != 0) {                return cmp;            }            //在第一列相等的情况下,第二列按倒序排序            return -ip1.getSecond().compareTo(ip2.getSecond());        }    }     /*  public static class GroupComparator extends WritableComparator {        //无参构造器必须加上,否则报错。        protected GroupComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            IntPair ip1 = (IntPair) a;            IntPair ip2 = (IntPair) b;            return ip1.getFirst().compareTo(ip2.getFirst());        }    }*/        //入口程序    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(SecondarySort.class);        //设置Mapper的相关属性        job.setMapperClass(TheMapper.class);        //当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。        //job.setMapOutputKeyClass(IntPair.class);        //job.setMapOutputValueClass(NullWritable.class);            FileInputFormat.setInputPaths(job, new Path(args[0]));                //设置分区的相关属性        job.setPartitionerClass(FirstPartitioner.class);        //在map中对key进行排序        job.setSortComparatorClass(KeyComparator.class);        //job.setGroupingComparatorClass(GroupComparator.class);          //设置Reducer的相关属性        job.setReducerClass(TheReducer.class);        job.setOutputKeyClass(IntPair.class);        job.setOutputValueClass(NullWritable.class);          FileOutputFormat.setOutputPath(job, new Path(args[1]));          //设置Reducer数量        int reduceNum = 1;        if(args.length >= 3 && args[2] != null){            reduceNum = Integer.parseInt(args[2]);        }        job.setNumReduceTasks(reduceNum);        job.waitForCompletion(true);    }     }

3)测试

打成secsort.jar包,从hdfs上的/test/secsortdata获取数据文件,mapreduce输出目录是/test/secsortresult8,启动1个reduce:

hadoop jar secsort.jar /test/secsortdata /test/secsortresult8 1

测试结果:

3、二次排序(写法二)

1)IntPair类

package com.hadoop.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class IntPair implements WritableComparable {    private int first = 0;    private int second = 0;    public void set(int first, int second) {        this.first = first;        this.second = second;    }    // 注意:需要添加无参的构造方法,否则反射时会报错。    public IntPair() {    }    public IntPair(int first, int second) {        set(first, second);    }    public int getFirst() {        return first;    }    public void setFirst(int first) {        this.first = first;    }    public int getSecond() {        return second;    }    public void setSecond(int second) {        this.second = second;    }    @Override    public void write(DataOutput out) throws IOException {        out.write(first);        out.write(second);    }    @Override    public void readFields(DataInput in) throws IOException {        first = in.readInt();        second = in.readInt();    }    @Override    public int hashCode() {        return first + "".hashCode() + second + "".hashCode();    }    @Override    public boolean equals(Object right) {        if (right instanceof IntPair) {            IntPair r = (IntPair) right;            return r.getFirst() == first && r.getSecond() == second;        } else {            return false;        }    }    // 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法    @Override    public int compareTo(IntPair o) {        if (first != o.getFirst()) {            return first - o.getFirst();        } else if (second != o.getSecond()) {            return o.getSecond() - second;        } else {            return 0;        }    } }

2)Secondary类

package com.hadoop.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondarySort {    static class TheMapper extends Mapper {        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String[] fields = value.toString().split("\t");            int field1 = Integer.parseInt(fields[0]);            int field2 = Integer.parseInt(fields[1]);            context.write(new IntPair(field1,field2), NullWritable.get());        }    }        static class TheReducer extends Reducer {        //private static final Text SEPARATOR = new Text("------------------------------------------------");        @Override        protected void reduce(IntPair key, Iterable values, Context context)                throws IOException, InterruptedException {            context.write(key, NullWritable.get());        }    }    public static class FirstPartitioner extends Partitioner {        @Override        public int getPartition(IntPair key, NullWritable value,                int numPartitions) {            return Math.abs(key.getFirst().get()) % numPartitions;        }            }        //如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序    public static class KeyComparator extends WritableComparator {        //无参构造器必须加上,否则报错。        protected KeyComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            IntPair ip1 = (IntPair) a;            IntPair ip2 = (IntPair) b;            //第一列按升序排序            int cmp = ip1.getFirst().compareTo(ip2.getFirst());            if (cmp != 0) {                return cmp;            }            //在第一列相等的情况下,第二列按倒序排序            return -ip1.getSecond().compareTo(ip2.getSecond());        }    }     /*  public static class GroupComparator extends WritableComparator {        //无参构造器必须加上,否则报错。        protected GroupComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            IntPair ip1 = (IntPair) a;            IntPair ip2 = (IntPair) b;            return ip1.getFirst().compareTo(ip2.getFirst());        }    }*/        //入口程序    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(SecondarySort.class);        //设置Mapper的相关属性        job.setMapperClass(TheMapper.class);        //当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。        //job.setMapOutputKeyClass(IntPair.class);        //job.setMapOutputValueClass(NullWritable.class);            FileInputFormat.setInputPaths(job, new Path(args[0]));                //设置分区的相关属性        job.setPartitionerClass(FirstPartitioner.class);        //在map中对key进行排序        job.setSortComparatorClass(KeyComparator.class);        //job.setGroupingComparatorClass(GroupComparator.class);          //设置Reducer的相关属性        job.setReducerClass(TheReducer.class);        job.setOutputKeyClass(IntPair.class);        job.setOutputValueClass(NullWritable.class);          FileOutputFormat.setOutputPath(job, new Path(args[1]));          //设置Reducer数量        int reduceNum = 1;        if(args.length >= 3 && args[2] != null){            reduceNum = Integer.parseInt(args[2]);        }        job.setNumReduceTasks(reduceNum);        job.waitForCompletion(true);    }     }

PS#Scala二次排序

package com.spark.secondApp import org.apache.spark.{SparkContext, SparkConf} object SecondarySort {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName(" Secondary Sort ").setMaster("local")    val sc = new SparkContext(conf)    val file = sc.textFile("hdfs://worker02:9000/test/secsortdata")      val rdd = file.map(line => line.split("\t")).      map(x => (x(0),x(1))).groupByKey().      sortByKey(true).map(x => (x._1,x._2.toList.sortWith(_>_)))      val rdd2 = rdd.flatMap{      x =>      val len = x._2.length      val array = new Array[(String,String)](len)      for(i <- 0 until len) {        array(i) = (x._1,x._2(i))      }      array      }      sc.stop()  } }

承接子推荐阅读:

1,hadoop系列之基础系列

2,hadoop系列之深入优化

后续会讲MR join的经典案例。

kafka,hbase,spark,Flink等入门到深入源码,spark机器学习,大数据安全,大数据运维

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:hadoop系列之MR经典案例分享二
下一篇:SDL显示文本
相关文章

 发表评论

暂时没有评论,来抢沙发吧~