五、MapReduce普通排序例子--统计手机号流量

网友投稿 385 2022-11-26

五、MapReduce普通排序例子--统计手机号流量

1、需求

统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****

2、数据输入及输出格式

源数据比较敏感,这里就不展示出来了

输入格式为:

时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码 分隔符为“\t”

输出格式为:

手机号码 上行流量 下行流量 总流量 并且根据总流量的大小进行排序

3、思路分析

map阶段:切分字段,以手机号为key,value为一个bean对象,value保存对应手机号的上下行流量、以及总流量;key保存手机号,也就是类似的结构:

<1234567,<上下行流量,总流量>>

reduce阶段:对于同一个key的(即同一手机号)的上下行流量进行累加,获取总的上下行流量、总流量。并且最后需要对总流量进行排序,所以reduce输出的key为整个bean,value为空

4、具体程序

FlowBean.java

/*用于保存流量数据的自定义可序列化类*/ package PhoneData; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Getter @Setter @NoArgsConstructor public class FlowBean implements WritableComparable { /** 该类是一个可序列化类,且可比较,所以要实现 WritableComparable接口 * 上传、下载、总流量 */ private int upFlow; private int downFlow; private int sumFlow; public FlowBean(int upFlow, int downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /** * 序列化方法 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.upFlow); dataOutput.writeInt(this.downFlow); dataOutput.writeInt(this.sumFlow); } /** * 反序列化 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readInt(); this.downFlow = dataInput.readInt(); this.sumFlow = dataInput.readInt(); } /** * 打印字符串方法 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.upFlow); sb.append(" "); sb.append(this.downFlow); sb.append(" "); sb.append(this.sumFlow); return sb.toString(); } /** * 对象的比较方法,用于排序比较 * @param o * @return */ @Override public int compareTo(FlowBean o) { return this.getSumFlow() > o.getSumFlow() ? -1 : 1; } }

mapper:

package PhoneData; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PhoneMapper extends Mapper { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); //开始解析切割数据 k.set(fields[1]); int downFlow = Integer.parseInt(fields[fields.length - 2]); int upFlow = Integer.parseInt(fields[fields.length - 3]); v.setDownFlow(downFlow); v.setUpFlow(upFlow); v.setSumFlow(upFlow + downFlow); context.write(k, v); } }

reducer:

package PhoneData; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PhoneReducer extends Reducer { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; //对上传、下载、总流量进行累加 for (FlowBean f : values) { upFlow += f.getUpFlow(); downFlow += f.getDownFlow(); sumFlow += f.getSumFlow(); } //将汇总的数据写到新的bean中,然后输出 v.setUpFlow(upFlow); v.setDownFlow(downFlow); v.setSumFlow(sumFlow); context.write(v, key); } }

driver:

package PhoneData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class PhoneDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\phone_data.txt", "G:\\test\\A\\phonetest5\\"}; Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //配置driver,map,reduce类 job.setJarByClass(PhoneDriver.class); job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); //指定map和reduce的输出类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); //指定输入数据,输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job job.waitForCompletion(true); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:六、MapReduce排序例子--获取价格最高的商品信息
下一篇:英创信息技术ESM335x系列与EM335x之比较
相关文章

 发表评论

暂时没有评论,来抢沙发吧~