Hadoop大数据——mapreduce的join算法
(1)Reduce side join 示例: 订单数据 商品信息 实现机制: 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
public class OrderJoin {
static class OrderJoinMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到一行数据,并且要分辨出这行数据所属的文件
String line = value.toString();
String[] fields = line.split("\t");
// 拿到itemid
String itemid = fields[0];
// 获取到这一行所在的文件名(通过inpusplit)
String name = "你拿到的文件名";
// 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段)
OrderJoinBean bean = new OrderJoinBean();
bean.set(null, null, null, null, null);
context.write(new Text(itemid), bean);
}
}
static class OrderJoinReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable beans, Context context) throws IOException, InterruptedException {
//拿到的key是某一个itemid,比如1000
//拿到的beans是来自于两类文件的bean
// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}
//将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出
}
}
}
缺点:reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高 容易产生数据倾斜 注:也可利用二次排序的逻辑来实现reduce端join (2)Map side join –原理阐述 适用于关联表中有小表的情形; 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果 可以大大提高join操作的并发度,加快处理速度 –示例:先在mapper类中预先定义好小表,进行join –引入实际场景中的解决方案:一次加载数据库或者用distributedcache
public class TestDistributedCache {
static class TestDistributedCacheMapper extends Mapper{
FileReader in = null;
BufferedReader reader = null;
HashMap b_tab = new HashMap();
String localpath =null;
String uirpath = null;
//是在map任务初始化的时候调用一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
Path[] files = context.getLocalCacheFiles();
localpath = files[0].toString();
URI[] cacheFiles = context.getCacheFiles();
//缓存文件的用法——直接用本地IO来读取
//这里读的数据是map task所在机器本地工作目录中的一个小文件
in = new FileReader("b.txt");
reader =new BufferedReader(in);
String line =null;
while(null!=(line=reader.readLine())){
String[] fields = line.split(",");
b_tab.put(fields[0],fields[1]);
}
IOUtils.closeStream(reader);
IOUtils.closeStream(in);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//这里读的是这个map task所负责的那一个切片数据(在hdfs上)
String[] fields = value.toString().split("\t");
String a_itemid = fields[0];
String a_amount = fields[1];
String b_name = b_tab.get(a_itemid);
// 输出结果 1001 98.9 banan
context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TestDistributedCache.class);
job.setMapperClass(TestDistributedCacheMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//这里是我们正常的需要处理的数据所在路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//不需要reducer
job.setNumReduceTasks(0);
//分发一个文件到task进程的工作目录
job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
//分发一个归档文件到task进程的工作目录
// job.addArchiveToClassPath(archive);
//分发jar包到task节点的classpath下
// job.addFileToClassPath(jarfile);
job.waitForCompletion(true);
}
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~