迁移数据到 HBase(批量导入)
需要先将数据源导出成文本文件,并且将文本文件上传到 HDFS 中。
迁移到 HBase 有两种方案:
方案一:利用 MapReduce 中封装好的方法。在 map 阶段,把数据封装成 Put 操作,直接将数据入库。方案二:利用 Bulkload,首先使用 MapReduce 直接生成 HFile 文件,然后再通过 Bulkload 将 HFile 文件直接加载到表中。
方案一(Map + Put)
现在 HBase 中预先创建好接收数据的表
create 'tableName','ColumnFamilyName'
Java 项目中添加依赖
org.apache.hadoop hadoop-client 3.2.0 org.apache.hbase hbase-mapreduce 2.2.7
Java 代码如下
package com.yx;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/** * 利用 MapReduce 中的封装好的方法,在 Map 阶段将数据封装成 HBase 的 Put 操作(不需要 Reduce 阶段),直接将数据入库。 * * @author Chandler * @date 2022/3/3 */public class BatchImportMR { public static class BatchImportMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // ------------------------------------ // TODO 这里的数据解析逻辑按照输入的数据格式来 String[] strs = value.toString().split("\t"); String rowKey = strs[0]; String columnFamily = strs[1]; String name = strs[2]; String val = strs[3]; // ------------------------------------ // 最终需要将数据封装成如下格式 // 指定 RowKey Put put = new Put(rowKey.getBytes()); // 指定 Column put.addColumn(columnFamily.getBytes(), name.getBytes(), val.getBytes()); context.write(NullWritable.get(), put); } } public static void main(String[] args) throws Exception { if (args.length != 2) { // 如果传递的参数不够,程序直接退出 System.exit(100); } // Map 阶段输入的数据的路径,e.g hdfs://.... String inPath = args[0]; // 保存到 HBase 上的哪张表 String outTableName = args[1]; // 设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.table.name", outTableName); conf.set("hbase.zookeeper.quorum", "/* 注意这里请配置好 zookeeper 的访问地址 */"); // 封装 Job Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName); job.setJarByClass(BatchImportMR.class); // 指定输入路径 FileInputFormat.setInputPaths(job, new Path(inPath)); // 指定 Map 相关的代码 job.setMapperClass(BatchImportMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); // 这两行代码是关键, TableMapReduceUtil.initTableReducerJob(outTableName, null, job); TableMapReduceUtil.addDependencyJars(job); // 禁用 Reduce job.setNumReduceTasks(0); // 启动任务 job.waitForCompletion(true); }}
打包 Java 代码上传到 Hadoop 客户端节点执行
注意:在打 Jar 包之前,需要对pom.xml中的依赖添加provided配置,但是 hbase-client 和 hbase-mapreduce 不能设置 provided,这两个依赖需要打进 Jar 包里面,否则会提示找不到对应的类。
yarn jar xx.jar xx.xx.BatchImportMR "hdfs file path" "table name"
注意:提交 Job 之前还需要确保 Hadoop 集群、Zookeeper 集群、HBase 集群可以正常工作。
方案二(Map + Bulkload)
方案二大致和方案一一样,也是先在 HBase 中创建表,编写 Java 代码,但是 Map 阶段的输出 Key 需要指定为 ImmutableBytesWritable 格式,具体代码如下
package com.yx;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/** * 利用 MapReduce 中的封装好的方法,在 Map 阶段将数据封装成 HBase 的 Put 操作(不需要 Reduce 阶段),直接将数据入库。 * * @author Chandler * @date 2022/3/3 */public class BatchImportMR { public static class BatchImportMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // ------------------------------------ // TODO 这里的数据解析逻辑按照输入的数据格式来 String[] strs = value.toString().split("\t"); String rowKey = strs[0]; String columnFamily = strs[1]; String name = strs[2]; String val = strs[3]; // ------------------------------------ // 最终需要将数据封装成如下格式 // 指定 RowKey ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowkey.getBytes()); Put put = new Put(rowkey.getBytes()); put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes()); context.write(rowkeyWritable,put); } } public static void main(String[] args) throws Exception { if (args.length != 3) { // 如果传递的参数不够,程序直接退出 System.exit(100); } // Map 阶段输入的数据的路径,e.g hdfs://.... String inPath = args[0]; // Map 阶段输出的数据的路径,e.g hdfs://.... String outPath = args[1]; // 保存到 HBase 上的哪张表 String outTableName = args[2]; // 设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.table.name", outTableName); conf.set("hbase.zookeeper.quorum", "/* 注意这里请配置好 zookeeper 的访问地址 */"); // 封装 Job Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName); job.setJarByClass(BatchImportMR.class); // 指定输入路径 FileInputFormat.setInputPaths(job, new Path(inPath)); //指定输出路径[如果输出路径存在,就将其删除] FileSystem fs = FileSystem.get(conf); Path output = new Path(outPath); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job, output); // 指定 Map 相关的代码 job.setMapperClass(BatchImportMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // 这几行代码是关键, Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(outTableName); HFileOutputFormat2.configureIncrementalLoad( job, connection.getTable(tableName), connection.getRegionLocator(tableName) ); // 禁用 Reduce job.setNumReduceTasks(0); // 启动任务 job.waitForCompletion(true); }}
打包 Jar 上传到 Hadoop 客户端上执行,注意事项和方案一一样
在 HBase 客户端节点上执行导入命令
hbase org.apache.hadoop.hbase.tool.BulkLoadHFilesTool 'hdfs://...' 'table name'
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~