Java API如何实现向Hive批量导入数据

网友投稿 305 2022-12-26

Java API如何实现向Hive批量导入数据

java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

package com.enn.idcard;

import java.io.IOException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

*

Description:

* @author kangkaia

* @date 2017年12月26日 下午1:42:24

*/

public class HiveJdbc {

public static void main(String[] args) throws IOException {

List argList = new ArrayList();

List arg = new ArrayList();

arg.add("12345");

arg.add("m");

argList.add(arg);

arg = new ArrayList();

arg.add("54321");

arg.add("f");

argList.add(arg);

// System.out.println(argList.toString());

String dst = "/test/kk.txt";

createFile(dst,argList);

loadData2Hive(dst);

}

/**

* 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"

* @param dst

* @param contents

* @throws IOException

*/

public static void createFile(String dst , List argList) throws IOException{

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

Path dstPath = new Path(dst); //目标路径

//打开一个输出流

FSDataOutputStream outputStream = fs.create(dstPath);

StringBuffer sb = new StringBuffer();

for(List arg:argList){

for(String value:arg){

sb.append(value).append("\001");

}

sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符

sb.append("\n");

}

sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符

byte[] contents = sb.toString().getBytes();

outputStream.write(contents);

outputStream.close();

fs.close();

System.out.println("文件创建成功!");

}

/**

* 将HDFS文件load到hive表中

* @param dst

*/

public static void loadData2Hive(String dst) {

String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";

String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";

String username = "admin";

String password = "admin";

Connection con = null;

try {

Class.forName(JDBC_DRIVER);

con = (Connectihttp://on) DriverManager.getConnection(CONNECTION_URL,username,password);

Statement stmt = con.createStatement();

String sql = " load data inpath '"+dst+"' into table population.population_information ";

stmt.execute(sql);

System.out.println("loadData到Hive表成功!");

} catch (SQLException e) {

e.printStackTrace();

} catch (ClassNotFoundException e) {

e.printStackTrace();

}finally {

// 关闭rs、ps和con

if(con != null){

try {

con.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

}

}

注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

insert overwrite table seqfile_table select * from textfile_table;

……

insert overwrite table rcfile_table select * from textfile_table;

java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

org.apache.hive

hive-jdbc

1.1.0

代码

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class Demo {

public static void main(String[] args) throws Exception {

List argList = new ArrayList();

List arg = new ArrayList();

arg.add("12345");

arg.add("m");

argList.add(arg);

arg = new ArrayList();

arg.add("54321");

arg.add("f");

argList.add(arg);

// System.out.println(argList.toString());

String dst = "/test/kk.txt";

createFile(dst,argList);

// loadData2Hive(dst);

}

/**

* 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"

* @param dst

* @param contents

* @throws IOException

* @throws Exception

* @throws InterruptedException

*/

public static void createFile(String dst , List argList) throws IOException, InterruptedException, Exception{

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");

Path dstPath = new Path(dst); //目标路径

//打开一个输出流

FSDataOutputStream outputStream = fs.create(dstPath);

StringBuffer sb = new StringBuffer();

for(List arg:argList){

for(String value:arg){

sb.append(value).append("|");

}

sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符

sb.append("\n");

}

byte[] contents = sb.toString().getBytes();

outputStream.write(contents);

outputStream.flush();;

outputStream.close();

fs.close();

System.out.println("文件创建成功!");

}

/**

* 将HDFS文件load到hive表中

* @param dst

*/

public static void loadData2Hive(String dst) {

String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";

String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";

String username = "root";

String password = "root";

Connection con = null;

try {

Class.forName(JDBC_DRIVER);

con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);

Statement stmt = con.createStatement();

String sql = " load data inpath '"+dst+"' into table test ";//test 为插入的表

stmt.execute(sql);

System.out.println("loadData到Hive表成功!");

} catch (SQLException e) {

e.printStackTrace();

} catch (ClassNotFoundException e) {

e.printStackTrace();

}finally {

// 关闭rs、ps和con

if(con != null){

try {

con.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:怎么用api接口做网站(网站api接口调用教程)
下一篇:Mybatis中使用万能的Map传参实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~