#yyds干货盘点# Hadoop之数据压缩

网友投稿 260 2022-11-22

#yyds干货盘点# Hadoop之数据压缩

Hadoop之数据压缩

1. 概述

1)压缩的好处和坏处压缩的优点:以减少磁盘IO、减少磁盘存储空间。压缩的缺点:增加CPU开销。2)压缩原则(1)运算密集型的Job,少用压缩,因为每次运算需要解压缩(2)IO密集型的J33ob,多用压缩,传输不需要解压缩

2. MR支持的压缩编码

Snappy是一个压缩/解压缩库。它不瞄准最大压缩,或与任何其他压缩库的兼容性;相反,它旨在非常高速和合理的压缩。例如,与Zlib的最快模式相比,Snappy是大多数输入的速度更快的数量级,但是由此产生的压缩文件从20%到100%更大。64位中的核心I7处理器的单个核心模式,SNAPPY压缩在约250 MB /秒或更高或更高,并在约500 MB /秒或更长时间减压。

3. 压缩方式选择

压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。

Gzip压缩优点:压缩率比较高; 缺点:不支持Split;压缩/解压速度一般; Bzip2压缩优点:压缩率高;支持Split; 缺点:压缩/解压速度慢。 Lzo压缩优点:压缩/解压速度比较快;支持Split;缺点:压缩率一般;想支持切片需要额外创建索引。 Snappy压缩优点:压缩和解压缩速度快; 缺点:不支持Split;压缩率一般; 压缩位置选择压缩可以在MapReduce作用的任意阶段启用。

压缩实操案例

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 开启map端输出压缩 conf.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

Mapper保持不变

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 循环写出 for(String word:words){ k.set(word); context.write(k, v); } } }

Reducer保持不变

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; // 1 汇总 for(IntWritable value:values){ sum += value.get(); } v.set(sum); // 2 输出 context.write(key, v); } }

修改驱动

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:添加OpenVX™ 1.1的视觉应用支持PowerVR GPU
下一篇:#yyds干货盘点# Hadoop之MapReduce内核源码解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~