十五、MapReduce--自定义output输出

网友投稿 252 2022-11-26

十五、MapReduce--自定义output输出

我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。

1、需求将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。

2、源码源数据

http://cn.bing.com http://baidu.com http://google.com http://itstar.com http://itstar1.com http://itstar2.com http://itstar3.com http://baidu.com http://sin2a.com http://sin2a.comw.google.com http://sin2desa.com http://sin2desa.comw.google.com http://sina.com http://sindsafa.com class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); } }

RecordWriter

public class MyRecordWriter extends RecordWriter { private FSDataOutputStream startOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { try { FileSystem fs = FileSystem.get(job.getConfiguration()); startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log")); otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); //如果key中包含itstar就写入到另外一个文件中 if (line.contains("itstar")) { this.startOut.writeUTF(line); } else { this.otherOut.writeUTF(line); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.startOut.close(); this.otherOut.close(); } }

mapper

public class MyOutputMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } }

reducer

public class MyOutputReducer extends Reducer { Text k = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String line = key.toString(); line = line + "\r\n"; k.set(line); context.write(k, NullWritable.get()); } }

driver

ublic class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyDriver.class); job.setMapperClass(MyOutputMapper.class); job.setReducerClass(MyOutputReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定义输出的实现子类,也是继承FileOutputFormat job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //这个路径输出的是job的执行成功successs文件的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring定时任务无故停止又不报错的解决
下一篇:十四、MapReduce--OutputFormat和RecordWriter抽象类
相关文章

 发表评论

暂时没有评论,来抢沙发吧~