MapReducer自定义OutPutFormat

网友投稿 241 2022-11-25

MapReducer自定义OutPutFormat

package MyOutPutFormat;

import DBOutFormat.MysqlDBOutPutFormat;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;import java.io.IOException;

import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath;

public class reduce {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

String input = "data/access.log" ; String output ="out" ; final Configuration co = new Configuration() ; final Job job = Job.getInstance(co); //设置class job.setJarByClass(reduce.class); //设置mapper 和 Reduce job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置 Mapper 阶段输出数据的key 和value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //设置Reducer 阶段输出数据的key 和value job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); //job输出发生变化 ,使用自定义的 outputformat job.setOutputFormatClass(MyOutputFormat.class); Path outDir = getOutputPath(job) ; if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { File file = new File(outDir.toUri()) ; if(file.isDirectory()){ File[] childrenFiles = file.listFiles(); for (File childFile:childrenFiles){ childFile.delete() ; } } file.delete(); } //提交 job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } public static class MyMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } } public static class MyReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } public static class MyOutputFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fileSystem = FileSystem.get(job.getConfiguration()) ; FSDataOutputStream fsoUt13= fileSystem.create(new Path("out/13.log")); FSDataOutputStream otherOut15= fileSystem.create(new Path("out/15.log")); FSDataOutputStream otherOut18= fileSystem.create(new Path("out/18.log")); FSDataOutputStream otherOut= fileSystem.create(new Path("out/99.log")); return new RecordWriter() { @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String[] st = key.toString().split("\t") ; if(st[1].trim().startsWith("13")){ fsoUt13.write(key.toString().getBytes()); fsoUt13.write("\r".getBytes()); fsoUt13.flush(); }else if(st[1].trim().startsWith("15")){ otherOut15.write(key.toString().getBytes()); otherOut15.write("\r".getBytes()); otherOut15.flush(); }else if(st[1].trim().startsWith("18")){ otherOut18.write(key.toString().getBytes()); otherOut18.write("\r".getBytes()); otherOut18.flush(); }else{ otherOut.write(key.toString().getBytes()); otherOut.write("\r".getBytes()); otherOut.flush(); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { fsoUt13.close(); otherOut15.close(); otherOut18.close(); otherOut.close(); fileSystem.close(); } }; } }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Springboot项目实现将类从@ComponentScan中排除
下一篇:10位模数转换器MAX1072/75的性能特点和典型应用设计分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~