c语言sscanf函数的用法是什么
207
2022-11-26
#IT明星不是梦#Hadoop整合Hbase案例详解
需求:编写mapreduce程序实现将hbase中的一张表的数据复制到另一张表中
*要求:读取HBase当中user这张表的f1:name、f1:age数据,将数据写入到另外一张user2表的f1列族里面去==****
第一步:创建表
注意:两张表的列族一定要相同
/** create 'user','f1' put 'user','rk001','f1:name','tony' put 'user','rk001','f1:age','12' put 'user','rk001','f1:address','beijing' put 'user','rk002','f1:name','wangwu' create 'user2','f1' */
第二步:创建maven工程并导入jar包
pom.xml文件内容如下:
第三步:开发MR程序实现功能
(1)自定义map类
import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
myuser f1: name&age => myuser2 f1*/public class HBaseReadMapper extends TableMapper<Text, Put> {/*** @param key rowkey @param value rowkey此行的数据 Result类型 @param context @throws IOException @throws InterruptedExceptionbr/>`*/``@Override`protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { /** ImmutableBytesWritable key:Mapper接收数据值是Put对象,key是hbase中一条数据Put对应的rowkey(可序列化) Result value:hbase中读取的result对象 获取rowkey的字节数组*/ //获得roweky的字节数组byte[] rowkey_bytes = key.get();String rowkeyStr = Bytes.toString(rowkey_bytes);Text text = new Text(rowkeyStr); //输出数据 -> 写数据 -> Put 构建Put对象Put put = new Put(rowkey_bytes);//获取一行中所有的Cell对象Cell[] cells = value.rawCells();//将f1 : name& age输出for(Cell cell: cells) {//当前cell是否是f1//列族byte[] family_bytes = CellUtil.cloneFamily(cell);String familyStr = Bytes.toString(family_bytes);if("f1".equals(familyStr)) {//在判断是否是name | agebyte[] qualifier_bytes = CellUtil.cloneQualifier(cell);String qualifierStr = Bytes.toString(qualifier_bytes);if("name".equals(qualifierStr)) {put.add(cell);}if("age".equals(qualifierStr)) {put.add(cell);}}} //判断是否为空;不为空,才输出if(!put.isEmpty()){context.write(text, put);}}}
(2)自定义reduce类
package com.kaikeba.hbase.demo01;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* TableReducer第三个泛型包含rowkey信息
*/
public class HBaseWriteReducer extends TableReducer
(3)main入口类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HBaseMR extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); //设定绑定的zk集群 configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); int run = ToolRunner.run(configuration, new HBaseMR(), args); System.exit(run); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf()); job.setJarByClass(HBaseMR.class); //mapper TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"), new Scan(),HBaseReadMapper.class, Text.class, Put.class, job); //reducer TableMapReduceUtil.initTableReducerJob("myuser2", HBaseWriteReducer.class, job); boolean b = job.waitForCompletion(true); return b? 0: 1; } }
第四步:打成jar包提交到集群运行
hadoop jar HbaseTang-1.0-SNAPSHOT.jar mapreduce_hbase.HbaseMR
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~