c语言sscanf函数的用法是什么
269
2022-11-26
使用BulkLoad从HDFS批量导入数据到HBase
数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中。这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证。所以,介绍一种性能更高的写入方式BulkLoad。
实例代码pom依赖:
package com.yangshou;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BulkLoadMapper extends Mapper
package com.yangshou; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class BulkLoadDriver { public static void main(String[] args) throws Exception { //获取Hbase配置 Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("BulkLoadDemo")); Admin admin = conn.getAdmin(); //设置job Job job = Job.getInstance(conf,"BulkLoad"); job.setJarByClass(BulkLoadDriver.class); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //设置文件的输入输出路径 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0")); FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1")); //将数据加载到Hbase表中 HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); if(job.waitForCompletion(true)){ LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); } } }
实例数据
44979 100640791 134060896 1 5271 2014-12-09 天津市 44980 100640791 96243605 1 13729 2014-12-02 新疆
在Hbase shell 中创建表
create 'BulkLoadDemo','info'
打包后执行```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver
注意:在执行hadoop jar之前应该先将Hbase中的相关包加载过来
export HADOOP_CLASSPATH=$HBASE_HOME/lib/*
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~