Hadoop大数据——mapreduce的join算法

网友投稿 276 2022-11-24

Hadoop大数据——mapreduce的join算法

(1)Reduce side join 示例: 订单数据 商品信息 实现机制: 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联 public class OrderJoin { static class OrderJoinMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到一行数据,并且要分辨出这行数据所属的文件 String line = value.toString(); String[] fields = line.split("\t"); // 拿到itemid String itemid = fields[0]; // 获取到这一行所在的文件名(通过inpusplit) String name = "你拿到的文件名"; // 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段) OrderJoinBean bean = new OrderJoinBean(); bean.set(null, null, null, null, null); context.write(new Text(itemid), bean); } } static class OrderJoinReducer extends Reducer { @Override protected void reduce(Text key, Iterable beans, Context context) throws IOException, InterruptedException { //拿到的key是某一个itemid,比如1000 //拿到的beans是来自于两类文件的bean // {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name} //将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出 } } } 缺点:reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高 容易产生数据倾斜 注:也可利用二次排序的逻辑来实现reduce端join (2)Map side join –原理阐述 适用于关联表中有小表的情形; 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果 可以大大提高join操作的并发度,加快处理速度 –示例:先在mapper类中预先定义好小表,进行join –引入实际场景中的解决方案:一次加载数据库或者用distributedcache public class TestDistributedCache { static class TestDistributedCacheMapper extends Mapper{ FileReader in = null; BufferedReader reader = null; HashMap b_tab = new HashMap(); String localpath =null; String uirpath = null; //是在map任务初始化的时候调用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用 Path[] files = context.getLocalCacheFiles(); localpath = files[0].toString(); URI[] cacheFiles = context.getCacheFiles(); //缓存文件的用法——直接用本地IO来读取 //这里读的数据是map task所在机器本地工作目录中的一个小文件 in = new FileReader("b.txt"); reader =new BufferedReader(in); String line =null; while(null!=(line=reader.readLine())){ String[] fields = line.split(","); b_tab.put(fields[0],fields[1]); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //这里读的是这个map task所负责的那一个切片数据(在hdfs上) String[] fields = value.toString().split("\t"); String a_itemid = fields[0]; String a_amount = fields[1]; String b_name = b_tab.get(a_itemid); // 输出结果 1001 98.9 banan context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name )); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TestDistributedCache.class); job.setMapperClass(TestDistributedCacheMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //这里是我们正常的需要处理的数据所在路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //不需要reducer job.setNumReduceTasks(0); //分发一个文件到task进程的工作目录 job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt")); //分发一个归档文件到task进程的工作目录 // job.addArchiveToClassPath(archive); //分发jar包到task节点的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何自己设计一款温湿度计
下一篇:Hadoop大数据——mapreduce的Distributed cache
相关文章

 发表评论

暂时没有评论,来抢沙发吧~