十一、MapReduce--自定义Input输入

网友投稿 218 2022-11-26

十一、MapReduce--自定义Input输入

在“MapReduce--input之输入原理”中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。

1、需求

将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径,以及小文件的内容。

2、源码

inputFormat

public class SFileInputFormat extends FileInputFormat { /** * 是否切片 * @param context * @param filename * @return */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * 返回读取文件内容的读取器 * @param inputSplit * @param taskAttemptContext * @return * @throws IOException * @throws InterruptedException */ @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { SRecordReader sRecordReader = new SRecordReader(); sRecordReader.initialize(inputSplit, taskAttemptContext); return sRecordReader; } }

RecordReader:

public class SRecordReader extends RecordReader { private Configuration conf; private FileSplit split; //当前分片是否已读取的标志位 private boolean process = false; private BytesWritable value = new BytesWritable(); /** * 初始化 * @param inputSplit * @param taskAttemptContext * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { split = (FileSplit)inputSplit; conf = taskAttemptContext.getConfiguration(); } /** * 从分片中读取下一个KV * @return * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!process) { byte[] buffer = new byte[(int) split.getLength()]; //获取文件系统 Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); //创建输入流 FSDataInputStream fis = fs.open(path); //流对接,将数据读取缓冲区 IOUtils.readFully(fis, buffer, 0, buffer.length); //将数据装载入value value.set(buffer, 0, buffer.length); //关闭流 IOUtils.closeStream(fis); //读完就标志位设置为true,表示已读 process = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return process? 1 : 0; } @Override public void close() throws IOException { } }

mapper:

public class SFileMapper extends Mapper { Text k = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); String name = inputSplit.getPath().toString(); k.set(name); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(k, value); } }

reducer:

public class SFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } }

driver:

public class SFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SFileDriver.class); job.setMapperClass(SFileMapper.class); job.setReducerClass(SFileReducer.class); //设置输入和输出类,默认是 TextInputFormat job.setInputFormatClass(SFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }

自定义的inputformat需要在job中通过 job.setInputFormatClass() 来指定

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用springboot时,解决@Scheduled定时器遇到的问题
下一篇:UART的基础简介
相关文章

 发表评论

暂时没有评论,来抢沙发吧~