Java定时调用.ktr文件的示例代码(解决方案)

网友投稿 363 2023-01-21

Java定时调用.ktr文件的示例代码(解决方案)

1.Maven依赖

pentaho-kettle

kettle-core

7.1.0.0-12

pentaho-kettle

kettle-engine

7.1.0.0-12

pentaho-kettle

metastore

7.1.0.0-12

commons-io

commons-io

1.4

mysql

mysql-connector-java

5.1.49

注意:kettle的jar包依赖会拉不下来,需要将jar包install到本地,命令:

创建 0_install.bat 文件

:: 本地 install kettle-core.jar包

CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar

:: 本地 install kettle-engine.jar包

CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar

:: 本地 install metastore.jar包

CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar

pause

或者deploy到内网私服上,命令:

创建 1_deploy.bat 文件

:: 私服 deploy kettle-core.jar包

CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: 私服 deploy kettle-engine.jar包

CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: 私服 deploy metastore.jar包

CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

pause

(脚本创建在jar包目录下,创建好之后双击运行即可)

【jar包、脚本文件下载地址】

https://share.weiyun.com/eaOSjqP7

2.执行.ktr/.kjb工具类

KettleReadUtils.java

import org.pentaho.di.core.KettleEnvironment;

import org.pentaho.di.core.util.EnvUtil;

import org.pentaho.di.job.Job;

import org.pentaho.di.job.JobMeta;

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import java.io.InputStream;

/**

*

@Title KettleReadUtils

*

@Description Kettle工具包

*

* @author zhj

* @date 2021/4/8 10:50

*/

public class KettleReadUtils {

/**

* 调用 kettle ktr

*

* @param path 文件路径

*/

public static void runKtr(String path) {

try {

KettleEnvironment.init();

EnvUtil.environmentInit();

TransMeta transMeta = new TransMeta(path);

Trans trans = new Trans(transMeta);

trans.execute(null);

trans.waitUntilFinished();

if (trans.getErrors() > 0) {

throw new Exception("Errors during transformation execution!");

}

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 以流的方式调用 kettle ktr

*

* @param in 文件流

*/

public static void runKtrByStream(InputStream in) {

try {

KettleEnvironment.init();

TransMeta transMeta = new TransMeta(in, null, true, null, null);

Trans trans = new Trans(transMeta);

trans.execute(null);

trans.waitUntilFinished();

if (trans.getErrors() > 0) {

throw new Exception("Errors during transformation execution!");

}

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 调用 kettle job

*

* @param paraNames 多个参数名

* @param paraValues 多个参数值

* @param jobPath 如: String fName= "D:\\kettle\\aaa.kjb";

*/

public static void runJob(String[] paraNames, String[] paraValues, String jobPath) {

try {

KettleEnvironment.init();

JobMeta jobMeta = new JobMeta(jobPath, null);

Job job = new Job(null, jobMeta);

// 向Job 脚本传递参数,脚本中获取参数值:${参数名}

if (paraNames != null && paraValues != null) {

for (int i = 0; i < paraNames.length && i < paraValues.length; i++) {

job.setVariable(paraNames[i], paraValues[i]);

}

}

job.start();

job.waitUntilFinished();

if (job.getErrors() > 0) {

throw new Exception("Errors during job execution!");

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.创建.ktr/.kjb工具类

(此处只是提供java创建途径,可以直接使用Spoon.bat创建好的文件)

import org.apache.commons.io.FileUtils;

import org.pentaho.di.core.KettleEnvironment;

import org.pentaho.di.core.database.DatabaseMeta;

import org.pentaho.di.core.exception.KettleXMLException;

import org.pentaho.di.core.plugins.PluginRegistry;

import org.pentaho.di.core.plugins.StepPluginType;

import org.pentaho.di.trans.TransHopMeta;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.StepMeta;

imphttp://ort org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;

import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.ioATwuCf.File;

/**

*

@Title KettleReadUtils

*

@Description Kettle工具包

*

* @author zhj

* @date 2021/4/8 10:50

*/

public class KettleWriteUtils {

/**

* 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)

*/

private static final String DATABASE_XML_1 =

"" +

"" +

"db1" +

"<server>127.0.0.1" +

"MYSQL" +

"Native" +

"test" +

"3306" +

"root" +

"root" +

"";

private static final String DATABASE_XML_2 =

"" +

"" +

"db2" +

"127.0.0.1" +

"MYSQL" +

"Native" +

"test" +

"3306" +

"root" +

"root" +

"";

/**

* 创建ktr文件

*

* @param args args

*/

public static void main(String[] args) {

try {

KettleEnvironment.init();

KettleWriteUtils kettleWriteUtils = new KettleWriteUtils();

TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans();

String transXml = transMeta.getXML();

String transName = "update_insert_Trans.ktr";

File file = new File(transName);

FileUtils.writeStringToFile(file, transXml, "UTF-8");

} catch (Exception e) {

e.printStackTrace();

return;

}

}

/**

* 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作

* @return 元数据

* @throws KettleXMLException 生成XML异常

*/

private TransMeta generateMyOwnTrans() throws KettleXMLException {

TransMeta transMeta = new TransMeta();

//设置转化的名称

transMeta.setName("insert_update");

//添加转换的数据库连接

DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1);

transMeta.addDatabase(databaseMeta1);

DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2);

transMeta.addDatabase(databaseMeta2);

//registry是给每个步骤生成一个标识Id用

PluginRegistry registry = PluginRegistry.getInstance();

//第一个表输入步骤(TableInputMeta)

TableInputMeta tableInput = new TableInputMeta();

String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);

//给表输入添加一个DatabaseMeta连接数据库

DatabaseMeta db1 = transMeta.findDatabase("db1");

tableInput.setDatabaseMeta(db1);

String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user";

tableInput.setSQL(sql);

//添加TableInputMeta到转换中

StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput);

//给步骤添加在spoon工具中的显示位置

tableInputMetaStep.setDraw(true);

tableInputMetaStep.setLocation(100, 100);

transMeta.addStep(tableInputMetaStep);

//第二个步骤插入与更新

InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();

String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);

//添加数据库连接

DatabaseMeta db2 = transMeta.findDatabase("db2");

insertUpdateMeta.setDatabaseMeta(db2);

//设置操作的表

insertUpdateMeta.setTableName("t_stat_user_info");

//设置用来查询的关键字

insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"});

insertUpdateMeta.setKeyStream(new String[]{"USER_ID"});

insertUpdateMeta.setKeyStream2(new String[]{""});

insertUpdateMeta.setKeyCondition(new String[]{"="});

//设置要更新的字段

String[] updatelookup = {"USER_ID","USER_NAME"} ;

String[] updateStream = {"USER_ID","USER_NAME"} ;

Boolean[] updateOrNot = {false,true};

insertUpdateMeta.setUpdateLookup(updatelookup);

insertUpdateMeta.setUpdateStream(updateStream);

insertUpdateMeta.setUpdate(updateOrNot);

//添加步骤到转换中

StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta);

insertUpdateStep.setDraw(true);

insertUpdateStep.setLocation(250, 100);

transMeta.addStep(insertUpdateStep);

//添加hop把两个步骤关联起来

transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));

return transMeta;

}

}

4.测试执行.ktr文件

执行用例:

public static void main(String[] args) {

InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr");

runKtrByStream(inputStream);

}

.ktr文件位置:

执行结果:

5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容问题

mysql-connector-java 5.1.49 版本中,支持连接驱动,org.gjt.mm.mysql.Driver;

mysql-connector-java 8.* 版本中,连接驱动,com.mysql.cj.jdbc.Driver;

如果直接使用 8.* 版本 去连接 MySQL 数据库的话会出现"错误连接数据库"问题:

Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.

org.gjt.mm.mysql.Driver

解决方案:

1.关闭Kettle;

2.将/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar 替换为 mysql-connector-java-8.*.jar;

3.打开Kettle,修改连接类型为 Generic database ,配置驱动名称为 com.mysql.cj.jdbc.Driver;

4.重新导出为.ktr/.kjb文件;

5.再用java调用即可解决问题。

整理完毕,完结撒花~

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:短信接口api接口哪个好(短信对接接口)
下一篇:疫苗开放api接口(疫苗开放api接口有哪些)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~