c语言sscanf函数的用法是什么
254
2022-11-26
十二、MapReduce--mapjoin和reducejoin
一、map join
1、适用场景:一张表很大,一张表很小
2、解决方案:在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端的数据压力,尽可能减少数据倾斜。
3、具体方法:采用分布式缓存(1)在mapper的setup阶段,将文件读取到缓存集合中(2)在driver中加载缓存,job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点。
4、实例
//order.txt 订单id 商品id 商品数量 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 //pd.txt 商品id 商品名 01 小米 02 华为 03 格力
要将order中的商品id替换为商品名称,缓存 pd.txt 这个小表
mapper:
package MapJoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper
driver:
package MapJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\order.txt", "G:\\test\\A\\mapjoin\\join2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //将重复使用的小文件加载到缓存中 job.addCacheFile(new URI("file:///G:/test/A/mapjoin/pd.txt")); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
二、reduce join
1、分析思路通过将关联条件作为map的输出的key,也就是使用商品ID来作为key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
bean:
package ReduceJoin; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Getter @Setter @NoArgsConstructor @AllArgsConstructor public class OrderBean implements Writable { private String orderID; private String productID; private int amount; private String productName; private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.orderID); dataOutput.writeUTF(this.productID); dataOutput.writeInt(this.amount); dataOutput.writeUTF(this.productName); dataOutput.writeUTF(this.flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.orderID = dataInput.readUTF(); this.productID = dataInput.readUTF(); this.amount = dataInput.readInt(); this.productName = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.orderID); sb.append("\t"); sb.append(this.productName); sb.append("\t"); sb.append(this.amount); sb.append("\t"); sb.append(this.flag); return sb.toString(); } }
map:
package ReduceJoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class OrderMapper extends Mapper
reduce:
package ReduceJoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class OrderReducer extends Reducer
driver:
package ReduceJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\mapjoin\\", "G:\\test\\A\\reducejoin12\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(OrderBean.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~