kettle菜鸟教程
2.1、kettle 如何添加所需要的驱动jar包报错信息报错原因缺失mysql的驱动jar包解决方案1、下载jdbc驱动放到kettle的lib目录下2、正确填写数据库信息测试现在是否正常2.2、kettle学习之子映射组件映射组件就跟java中的函数方法一样类似一个子流程。根据数据库表中的id查询出想要的字段并把字段存到excel表中一、表输入二、子映射映射输入规范类似java方法中的形参name vsxcd是方法返回的参数三、excel输出运行结果成功展示2.3、kettle 读取记事本文件给java组件处理kettle9.4用到两个组件文本文件输入文件内容如下文本文件输入---文件文本文件输入---内容注意事项分隔符这里我一直没注意导致不管怎么读数据都读不到可以用换行符可以用其他的视情况而定到这里文本文件输入组件的部分结束了java代码组件跟普通的java类不一样没有类的申明只有方法在里面getRow()是kettle的自己的方法是获取行的意思;import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.trans.step.StepMetaInterface; import org.pentaho.di.core.row.RowMetaInterface; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { // 获取输入行数据 Object[] rowData getRow(); // 如果输入行为空则返回 false if (rowData null) { setOutputDone(); return false; } // 获取文本文件内容假设内容在第一个字段中 String fileContent rowData[0].toString(); logBasic(数据是: fileContent.toUpperCase()); // 处理完成返回 true return true; }运行查看结果把小写转换为大写了2.4、kettle学习之字段选择组件这个组件就是对字段进行改名或者删除等操作的文本文件输入字段选择字段选择2运行结果2.5、kettle之 Concat fields将字符串拼接起来用到两个组件一个是文本文件输入一个是 Concat fields成功截图文本文件输入根据;将文本内容分成两部分第一部分是a第二部分是bConcat fields运行即可这里的Fields是上一个步骤里面的输出的字段名称TargetField Name是输出的字段名称2.6、kettle学习之mysql数据库结果分组聚合查询1、查Mysql表2、对表结果进行排序分组先看表结构再看最终结果一、转换图二、表输入数据库连接17是这样的配置三、排序合并四、分组运行起来就可以了3.2、kettle学习之表的输入输出需求把表A里的数据传送到表B中在此之前清空表B内的数据表输入执行SQL脚本表输出4.1、kettle组件之java代码快速上手必看我们先了解不同于java代码的kettle的一些方法1、getRow()获取每一行数据循环读数据返回的是Object[]数组2、get(Fields.in,字段名);获取具体的某个字段的名称3、get(Fields.in,字段名).getString(r);获取这一行数据中对应的字段名并且是值是字符串类型的数据4、setOutputDone()结束输出不往后面的步骤传递数据5、putRow(outputRowMeta, outputRow);把数据的结构以及值传给下个数据6、logBasic(数据是: b);打印基础日志功能、获取记事本里面的数据把这个数据进行简单的处理然后输出;难点1、我不想要把处理后的值覆盖掉原来的值而是创建一个新的字段给这个新的字段赋值2、其次我不想要之前的字段很冗余记事本配置java代码如下public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //1、获取输入行数据 Object[] r getRow(); //2、若为null则停止输出 if (r null) { setOutputDone(); return false; } //3、获取数据 Object ar[0]; //4、打印日志 logBasic(获取到的数据是 a); //5、创建一个输出行是基于输入行的数据的形参个数也一致 r createOutputRow(r, data.outputRowMeta.size()); //6、给输出行的字段名为yy的赋值 get(Fields.Out, yy).setValue(r, a--hello--); //7、输出行 putRow(data.outputRowMeta, r); return true; }结果如下如果不勾选清空结果字段那么结果就会包含输入行信息到这里我们做一个小练习对redis进行读取与设置吧1、把redis的jar包放入kettle的lib下2、重启kettle3、编写代码redis set代码import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //1、读取输入行信息 Object[] r getRow(); if (r null) { setOutputDone(); return false; } //2、获取信息 String kinfoget(Fields.In,kinfo).getString(r); String knameget(Fields.In,kname).getString(r); //3、把redis的key存放到key的字段里 get(Fields.Out,key).setValue(r,kinfo); //4、连接redis Jedis jedis new Jedis(10.20.1.17, 6379); // 设置键值对 jedis.set(kinfo,kname); // 关闭连接 jedis.close(); //5、创建输出行 r createOutputRow(r, data.outputRowMeta.size()); //6、输出行 putRow(data.outputRowMeta, r); return true; }redis get 代码import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r getRow(); if (r null) { setOutputDone(); return false; } String keyget(Fields.In,key).getString(r); Jedis jedis new Jedis(10.20.1.17, 6379); // 获取键值对 String name jedis.get(key); // 关闭连接 jedis.close(); logBasic(获取redis信息 name); putRow(data.outputRowMeta, r); return true; }4、测试运行4.2、kettle之java组件 对redis集群进行增删改查1、kettle9.42、jdk8所需的jar包jedis-2.9.0.jarcommons-pool2-2.4.2.jarkettle自带commons-pool-1.5.7.jar如果不引入会报错ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : org.codehaus.commons.compiler.CompileException: org/apache/commons/pool2/impl/GenericObjectPoolConfig完整的测试demoimport redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Set; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r getRow(); if (r null) { setOutputDone(); return false; } r createOutputRow(r, data.outputRowMeta.size()); //1、连接redis JedisPoolConfig poolConfig new JedisPoolConfig(); poolConfig.setMaxTotal(5); poolConfig.setMaxIdle(1); poolConfig.setMaxWaitMillis(1000); SetHostAndPort nodes new LinkedHashSetHostAndPort(); nodes.add(new HostAndPort(11.11.1.40, 7001)); nodes.add(new HostAndPort(11.11.1.41, 7001)); nodes.add(new HostAndPort(11.11.1.42, 7001)); JedisCluster jedis new JedisCluster(nodes, poolConfig); //2、获取key值 //2、获取key值 String name get(Fields.In, name).getString(r); boolean exists jedis.exists(name); logBasic(redis的key是 name 是否存在 exists); //2.1、获取上一次的机米长度 String lastVsxzl 0; if (!exists) { jedis.hset(name, vsxzl, 0); } else { if (!jedis.hexists(name, vsxzl)) { jedis.hset(name, vsxzl, 0); } lastVsxzl jedis.hget(name, vsxzl); } //2.2、获取状态值 String status0; if (!jedis.hexists(name, status)) { jedis.hset(name, status, 0); }else{ statusjedis.hget(name,status); } //2.3、获取vsxg String vsxg0; if (!jedis.hexists(name, vsxg)) { jedis.hset(name, vsxg, 0); }else{ vsxgjedis.hget(name,vsxg); } logBasic(上次长 lastVsxzl 上次状态 status 上次vsxg vsxg); //3、输出key值 get(Fields.Out, lastVsxzl).setValue(r, lastVsxzl); get(Fields.Out, lastStatus).setValue(r, status); get(Fields.Out, lastVsxg).setValue(r, vsxg); // 关闭连接 try { jedis.close(); } catch (IOException e) { e.printStackTrace(); } putRow(data.outputRowMeta, r); return true; }?xml version1.0 encodingUTF-8? transformation info nameredisCuster/name description/ extended_description/ trans_version/ trans_typeNormal/trans_type trans_status0/trans_status directory//directory parameters /parameters log trans-log-table connection/ schema/ table/ size_limit_lines/ interval/ timeout_days/ field idID_BATCH/id enabledY/enabled nameID_BATCH/name /field field idCHANNEL_ID/id enabledY/enabled nameCHANNEL_ID/name /field field idTRANSNAME/id enabledY/enabled nameTRANSNAME/name /field field idSTATUS/id enabledY/enabled nameSTATUS/name /field field idLINES_READ/id enabledY/enabled nameLINES_READ/name subject/ /field field idLINES_WRITTEN/id enabledY/enabled nameLINES_WRITTEN/name subject/ /field field idLINES_UPDATED/id enabledY/enabled nameLINES_UPDATED/name subject/ /field field idLINES_INPUT/id enabledY/enabled nameLINES_INPUT/name subject/ /field field idLINES_OUTPUT/id enabledY/enabled nameLINES_OUTPUT/name subject/ /field field idLINES_REJECTED/id enabledY/enabled nameLINES_REJECTED/name subject/ /field field idERRORS/id enabledY/enabled nameERRORS/name /field field idSTARTDATE/id enabledY/enabled nameSTARTDATE/name /field field idENDDATE/id enabledY/enabled nameENDDATE/name /field field idLOGDATE/id enabledY/enabled nameLOGDATE/name /field field idDEPDATE/id enabledY/enabled nameDEPDATE/name /field field idREPLAYDATE/id enabledY/enabled nameREPLAYDATE/name /field field idLOG_FIELD/id enabledY/enabled nameLOG_FIELD/name /field field idEXECUTING_SERVER/id enabledN/enabled nameEXECUTING_SERVER/name /field field idEXECUTING_USER/id enabledN/enabled nameEXECUTING_USER/name /field field idCLIENT/id enabledN/enabled nameCLIENT/name /field /trans-log-table perf-log-table connection/ schema/ table/ interval/ timeout_days/ field idID_BATCH/id enabledY/enabled nameID_BATCH/name /field field idSEQ_NR/id enabledY/enabled nameSEQ_NR/name /field field idLOGDATE/id enabledY/enabled nameLOGDATE/name /field field idTRANSNAME/id enabledY/enabled nameTRANSNAME/name /field field idSTEPNAME/id enabledY/enabled nameSTEPNAME/name /field field idSTEP_COPY/id enabledY/enabled nameSTEP_COPY/name /field field idLINES_READ/id enabledY/enabled nameLINES_READ/name /field field idLINES_WRITTEN/id enabledY/enabled nameLINES_WRITTEN/name /field field idLINES_UPDATED/id enabledY/enabled nameLINES_UPDATED/name /field field idLINES_INPUT/id enabledY/enabled nameLINES_INPUT/name /field field idLINES_OUTPUT/id enabledY/enabled nameLINES_OUTPUT/name /field field idLINES_REJECTED/id enabledY/enabled nameLINES_REJECTED/name /field field idERRORS/id enabledY/enabled nameERRORS/name /field field idINPUT_BUFFER_ROWS/id enabledY/enabled nameINPUT_BUFFER_ROWS/name /field field idOUTPUT_BUFFER_ROWS/id enabledY/enabled nameOUTPUT_BUFFER_ROWS/name /field /perf-log-table channel-log-table connection/ schema/ table/ timeout_days/ field idID_BATCH/id enabledY/enabled nameID_BATCH/name /field field idCHANNEL_ID/id enabledY/enabled nameCHANNEL_ID/name /field field idLOG_DATE/id enabledY/enabled nameLOG_DATE/name /field field idLOGGING_OBJECT_TYPE/id enabledY/enabled nameLOGGING_OBJECT_TYPE/name /field field idOBJECT_NAME/id enabledY/enabled nameOBJECT_NAME/name /field field idOBJECT_COPY/id enabledY/enabled nameOBJECT_COPY/name /field field idREPOSITORY_DIRECTORY/id enabledY/enabled nameREPOSITORY_DIRECTORY/name /field field idFILENAME/id enabledY/enabled nameFILENAME/name /field field idOBJECT_ID/id enabledY/enabled nameOBJECT_ID/name /field field idOBJECT_REVISION/id enabledY/enabled nameOBJECT_REVISION/name /field field idPARENT_CHANNEL_ID/id enabledY/enabled namePARENT_CHANNEL_ID/name /field field idROOT_CHANNEL_ID/id enabledY/enabled nameROOT_CHANNEL_ID/name /field /channel-log-table step-log-table connection/ schema/ table/ timeout_days/ field idID_BATCH/id enabledY/enabled nameID_BATCH/name /field field idCHANNEL_ID/id enabledY/enabled nameCHANNEL_ID/name /field field idLOG_DATE/id enabledY/enabled nameLOG_DATE/name /field field idTRANSNAME/id enabledY/enabled nameTRANSNAME/name /field field idSTEPNAME/id enabledY/enabled nameSTEPNAME/name /field field idSTEP_COPY/id enabledY/enabled nameSTEP_COPY/name /field field idLINES_READ/id enabledY/enabled nameLINES_READ/name /field field idLINES_WRITTEN/id enabledY/enabled nameLINES_WRITTEN/name /field field idLINES_UPDATED/id enabledY/enabled nameLINES_UPDATED/name /field field idLINES_INPUT/id enabledY/enabled nameLINES_INPUT/name /field field idLINES_OUTPUT/id enabledY/enabled nameLINES_OUTPUT/name /field field idLINES_REJECTED/id enabledY/enabled nameLINES_REJECTED/name /field field idERRORS/id enabledY/enabled nameERRORS/name /field field idLOG_FIELD/id enabledN/enabled nameLOG_FIELD/name /field /step-log-table metrics-log-table connection/ schema/ table/ timeout_days/ field idID_BATCH/id enabledY/enabled nameID_BATCH/name /field field idCHANNEL_ID/id enabledY/enabled nameCHANNEL_ID/name /field field idLOG_DATE/id enabledY/enabled nameLOG_DATE/name /field field idMETRICS_DATE/id enabledY/enabled nameMETRICS_DATE/name /field field idMETRICS_CODE/id enabledY/enabled nameMETRICS_CODE/name /field field idMETRICS_DESCRIPTION/id enabledY/enabled nameMETRICS_DESCRIPTION/name /field field idMETRICS_SUBJECT/id enabledY/enabled nameMETRICS_SUBJECT/name /field field idMETRICS_TYPE/id enabledY/enabled nameMETRICS_TYPE/name /field field idMETRICS_VALUE/id enabledY/enabled nameMETRICS_VALUE/name /field /metrics-log-table /log maxdate connection/ table/ field/ offset0.0/offset maxdiff0.0/maxdiff /maxdate size_rowset10000/size_rowset sleep_time_empty50/sleep_time_empty sleep_time_full50/sleep_time_full unique_connectionsN/unique_connections feedback_shownY/feedback_shown feedback_size50000/feedback_size using_thread_prioritiesY/using_thread_priorities shared_objects_file/ capture_step_performanceN/capture_step_performance step_performance_capturing_delay1000/step_performance_capturing_delay step_performance_capturing_size_limit100/step_performance_capturing_size_limit dependencies /dependencies partitionschemas /partitionschemas slaveservers /slaveservers clusterschemas /clusterschemas created_user-/created_user created_date2024/06/03 08:48:46.329/created_date modified_user-/modified_user modified_date2024/06/03 08:48:46.329/modified_date key_for_session_keyH4sIAAAAAAAAAAMAAAAAAAAAAAA/key_for_session_key is_key_privateN/is_key_private /info notepads /notepads order hop from自定义常量数据/from to获取redis中上一个值/to enabledY/enabled /hop /order step name自定义常量数据/name typeDataGrid/type description/ distributeY/distribute custom_distribution/ copies1/copies partitioning methodnone/method schema_name/ /partitioning fields field namename/name typeString/type format/ currency/ decimal/ group/ length-1/length precision-1/precision set_empty_stringN/set_empty_string field_null_if/ /field /fields data line itemT1-1-running/item /line /data attributes/ cluster_schema/ remotesteps input /input output /output /remotesteps GUI xloc256/xloc yloc112/yloc drawY/draw /GUI /step step name获取redis中上一个值/name typeUserDefinedJavaClass/type description/ distributeY/distribute custom_distribution/ copies1/copies partitioning methodnone/method schema_name/ /partitioning definitions definition class_typeTRANSFORM_CLASS/class_type class_nameProcessor/class_name class_sourceimport redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Set; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r getRow(); if (r null) { setOutputDone(); return false; } r createOutputRow(r, data.outputRowMeta.size()); //1、连接redis JedisPoolConfig poolConfig new JedisPoolConfig(); poolConfig.setMaxTotal(5); poolConfig.setMaxIdle(1); poolConfig.setMaxWaitMillis(1000); Setlt;HostAndPort nodes new LinkedHashSetlt;HostAndPort(); nodes.add(new HostAndPort(11.11.1.40, 7001)); nodes.add(new HostAndPort(11.11.1.41, 7001)); nodes.add(new HostAndPort(11.11.1.42, 7001)); JedisCluster jedis new JedisCluster(nodes, poolConfig); //2、获取key值 //2、获取key值 String name get(Fields.In, name).getString(r); boolean exists jedis.exists(name); logBasic(redis的key是 name 是否存在 exists); //2.1、获取上一次的机米长度 String lastVsxzl 0; if (!exists) { jedis.hset(name, vsxzl, 0); } else { if (!jedis.hexists(name, vsxzl)) { jedis.hset(name, vsxzl, 0); } lastVsxzl jedis.hget(name, vsxzl); } //2.2、获取状态值 String status0; if (!jedis.hexists(name, status)) { jedis.hset(name, status, 0); }else{ statusjedis.hget(name,status); } //2.3、获取vsxg String vsxg0; if (!jedis.hexists(name, vsxg)) { jedis.hset(name, vsxg, 0); }else{ vsxgjedis.hget(name,vsxg); } logBasic(上次长 lastVsxzl 上次状态 status 上次vsxg vsxg); //3、输出key值 get(Fields.Out, lastVsxzl).setValue(r, lastVsxzl); get(Fields.Out, lastStatus).setValue(r, status); get(Fields.Out, lastVsxg).setValue(r, vsxg); // 关闭连接 try { jedis.close(); } catch (IOException e) { e.printStackTrace(); } putRow(data.outputRowMeta, r); return true; } /class_source /definition /definitions fields field field_namelastVsxzl/field_name field_typeString/field_type field_length-1/field_length field_precision-1/field_precision /field field field_namelastStatus/field_name field_typeString/field_type field_length-1/field_length field_precision-1/field_precision /field field field_namelastVsxg/field_name field_typeString/field_type field_length-1/field_length field_precision-1/field_precision /field /fields clear_result_fieldsN/clear_result_fields info_steps/ target_steps/ usage_parameters/ attributes/ cluster_schema/ remotesteps input /input output /output /remotesteps GUI xloc464/xloc yloc160/yloc drawY/draw /GUI /step step_error_handling /step_error_handling slave-step-copy-partition-distribution /slave-step-copy-partition-distribution slave_transformationN/slave_transformation attributes/ /transformation4.3、kettle 使用动态变量名定义变量name是变量value 值也是变量我需要把name作为变量名value作为变量值在kettle中使用javascript脚本key与lastVsxzl都是变量//Script here setVariable(key,lastVsxzl,r); var rgetVariable(key,r);Demo1、从记事本里面获取机头号name2、根据机头号从redis中获取相应的vsxzl值3、name与vsxzl 成为一个变量,name是变量名vsxzl是变量值成功的截图如下文本文件输入的截图获取redis值的截图import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { if (first) { first false; } Object[] r getRow(); if (r null) { setOutputDone(); return false; } r createOutputRow(r, data.outputRowMeta.size()); //1、连接redis Jedis jedis new Jedis(10.20.1.17, 6379); //2、获取key值 String name get(Fields.In, name).getString(r); boolean exists jedis.exists(name); //2.1、获取上一次的机米长度 String lastVsxzl 0; if (!exists) { jedis.hset(name, vsxzl, 0); } else { if (!jedis.hexists(name, vsxzl)) { jedis.hset(name, vsxzl, 0); } lastVsxzl jedis.hget(name, vsxzl); } //3、输出key值 get(Fields.Out, lastVsxzl).setValue(r, lastVsxzl); get(Fields.Out, key).setValue(r, namevsxzl); // 关闭连接 jedis.close(); // Send the row on to the next step. putRow(data.outputRowMeta, r); return true; }Javascript代码5.1、kettle pan.sh如何后台运行需求kettle运行转换当前终端关闭仍然能够继续运行。nohup ./pan.sh -file/root/kettle/job/monitor.ktr /root/kettle/job/log/log.txt 21