Hadoop之——使用hadoop自定义类型处理手机上网日志
不多说,直接上代码
package com.lyz.hadoop.count;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;/** * 利用Hadoop MapRedduce * @author liuyazhuang * */public class KpiApp { /** * 输入地址的路径 */ private static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/wlan"; /** * 计算结果输出的路径 */ private static final String OUT_PATH = "hdfs://liuyazhuang:9000/d1/out"; public static void main(String[] args) throws Exception{ //实例化Job对象 Job job = new Job(new Configuration(), KpiApp.class.getSimpleName()); //1.1指定输入文件路径 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定格式化输入文件的类 job.setInputFormatClass(TextInputFormat.class); //1.2指定自定义的Mapper类 job.setMapperClass(MyMapper.class); //指定输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWritable.class); //1.3指定分区类 job.setPartitionerClass(HashPartitioner.class); //指定任务数量 job.setNumReduceTasks(1); //1.4 TODO 排序,分区 //1.5 TODO 合并(可选) //2.2指定自定义的reducer类 job.setReducerClass(MyReducer.class); //指定输出的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3指定输出的位置 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //指定输出文件的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //把代码提交给JobTracker执行 job.waitForCompletion(true); } /** * Mapper * @author liuyazhuang * */ static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper.Context context)throws IOException, InterruptedException { String[] splited = value.toString().split("\t"); String msis = splited[1]; Text k2 = new Text(msis); KpiWritable v2 = new KpiWritable(Long.parseLong(splited[6]), Long.parseLong(splited[7]), Long.parseLong(splited[8]), Long.parseLong(splited[9])); context.write(k2, v2); } } /** * Reducer * @author liuyazhuang * */ static class MyReducer extends Reducer{ @Override protected void reduce(Text k2, Iterable v2s, Reducer.Context context) throws IOException, InterruptedException { long upPackNum = 0; long downPackNum = 0; long upPayLoad = 0; long downPayLoad = 0; for (KpiWritable kpiWritable : v2s) { upPackNum += kpiWritable.upPackNum; downPackNum += kpiWritable.downPackNum; upPayLoad += kpiWritable.upPayLoad; downPayLoad += kpiWritable.downPayLoad; } KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad); context.write(k2, v3); } }}/** * 自定义Hadoop数据类型 * @author liuyazhuang * */class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; public KpiWritable() { super(); } public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) { super(); this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; } @Override public void write(DataOutput out) throws IOException { out.writeLong(upPackNum); out.writeLong(downPackNum); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return "KpiWritable [upPackNum=" + upPackNum + ", downPackNum=" + downPackNum + ", upPayLoad=" + upPayLoad + ", downPayLoad=" + downPayLoad + "]"; }}
注意:
(1)在eclipse中调用的job.waitForCompletion(true)实际上执行如下方法
connect();
info = jobClient.submitJobInternal(conf);
(2)在connect()方法中,实际上创建了一个JobClient对象。
在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol。
JobSubmissionProtocol的实现类是JobTracker。
(3)在jobClient.submitJobInternal(conf)方法中,调用了
JobSubmissionProtocol.submitJob(...),
即执行的是JobTracker.submitJob(...)。
(4)Hadoop的数据类型要求必须实现Writable接口。
(5)java基本类型与Hadoop常见基本类型的对照
Long LongWritable
Integer IntWritable
Boolean BooleanWritable
String Text
java类型如何转化为hadoop基本类型
调用hadoop类型的构造方法,或者调用set()方法。
new LongWritable(123L);
hadoop基本类型如何转化为java类型
对于Text,需要调用toString()方法,其他类型调用get()方法。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~