Kettle中通过Java对日期变量进行转换实现数据循环抽取
在对数据库中大量数据进入导出或抽取时,往往由于数据量太大不能一次全部导出或抽取出来,只能分多次运行导出,且每次运行抽取不同的数据。因此我们可以通过定义变量并在每次运行完后对变量进行转换(如日期变量可以增加相应的天数得到新的日期),然后对变量进行判断,如果没有达到停止的条件则循环运行,如果达到停止的条件则中止任务。
比如我们需要将数据库中的某个表(ADT.INPATIENT)的2024年06月01日到2024年07月01日前的数据抽取到另外一张表(ADT.TEST11),由于期间的数据量过大,不能一次全部抽取完,因此设定每次循环运行只能抽取2天的数据,直至将2024年06月30日的数据抽取完毕。
整个作业流程为:初始变量→检验字段的值→执行业务逻辑转换(带日期变量的业务转换)→日期变量累加n(对日期变量进行递增的转换)→检验字段的值.... 日期变量累加n(对日期变量进行递增的转换)→检验字段的值→中止作业(不满足循环执行条件),如图:
第一步:创建【业务逻辑】转换:
打开Spoon.bat,在弹出的Kettle界面中依次选择【文件】/【新建】/【转换】,创建一个转换,保存名称为“业务逻辑转换”,并添加以下组件:
1.【表输入】:在左侧【核心对象】中检索“表输入”组件,双击该组件并在弹出的窗口中选择数据库链接、编写带有start_time和end_time变量的SQL语句(这2个变量将在作业中定义),如图:
2.【表输出】:在左侧【核心对象】中选择“输出”组件,选择“表输出”组件并按住【Shift】键,将两个组件连接起来。双击该组件并在弹出的窗口中选择数据库链接、目标模式(用户)、目标表、查询的关键字、更新的字段等,如图:
第二步:创建【日期变量累加n】转换:
打开Spoon.bat,在弹出的Kettle界面中依次选择【文件】/【新建】/【转换】,创建一个转换,保存名称为“日期变量累加n”
1.【获取变量】:在左侧【核心对象】中检索“获取变量”组件,双击该组件并在弹出的窗口中填写的变量start_time、end_time、stop_time和days(该变量将在作业中进行定义),如图:
2.【日期转换】:在左侧【核心对象】中检索“Java 代码”组件,双击该组件并在弹出的窗口中编写日期变量转换逻辑,如图:
说明:Java代码组件获取变量有3种方式:
方式一:通过getVariable方法获取变量:String varValue = System.getenv("myVariable");
方式二:通过环境变量获取变量:String varValue = System.getenv("myVariable");
方式三:通过参数获取变量:String varValue = getParameter("myParameter");
详细代码如下:
import org.pentaho.di.core.database.*;
import java.util.Date;
import java.text.*;
import java.util.*;
private DatabaseMeta databaseMeta;
private Database database;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
if (first){
first = false;
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
// 获取时间步长变量
String days = getVariable("days");
if (days == null || days.trim() == "") {
days = "0";
}
logBasic("获取时间步长====>"+days);
// 获取开始时间.
String startTime = getVariable("start_time");
startTime = getDayStartTime(startTime,Integer.parseInt(days));
// 获取endTime.
String endTime = getVariable("end_time");
endTime = getDayEndTime(endTime,Integer.parseInt(days));
// 获取stopTime.
String stopTime = getVariable("stop_time");
logBasic("转换后开始时间====>"+startTime);
logBasic("转换后结束时间====>"+endTime);
//判断结束日期变量和计算的结束日期
if(compareTime(stopTime,endTime)){
endTime = stopTime;
}
get(Fields.Out, "starttime").setValue(r, startTime);
get(Fields.Out, "endtime").setValue(r, endTime);
putRow(data.outputRowMeta, r);
return true;
}
/**
* 得到指定日期前后若干天日期的开始时间
* @param date 日期字符串yyyymmdd或yyyyMMddHHmmss
* @param days 加的天数
* @return 新日期 yyyyMMddHHmmss
*/
public static String getDayStartTime(String date, int days) {
java.util.Date ds;
try {
SimpleDateFormat sdf;
SimpleDateFormat dft;
if (date != null && date.trim().length() == 8) {
sdf = new SimpleDateFormat("yyyyMMdd");
dft = new SimpleDateFormat("yyyyMMdd");
} else {
sdf = new SimpleDateFormat("yyyyMMddHHmmss");
dft = new SimpleDateFormat("yyyyMMddHHmmss");
}
ds = sdf.parse(date);
Calendar calendar = Calendar.getInstance();
calendar.setTime(ds);
calendar.add(calendar.DAY_OF_YEAR, days);
return (date != null && date.trim().length() == 8) ? dft.format(calendar.getTime())+"000000" : dft.format(calendar.getTime());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "";
}
/**
* 得到指定日期前后若干天日期的结束时间
* @param date 日期字符串yyyymmdd或yyyyMMddHHmmss
* @param days 加的天数
* @return 新日期 yyyyMMddHHmmss
*/
public static String getDayEndTime(String date, int days) {
java.util.Date ds;
try {
SimpleDateFormat sdf;
SimpleDateFormat dft;
if (date != null && date.trim().length() == 8) {
sdf = new SimpleDateFormat("yyyyMMdd");
dft = new SimpleDateFormat("yyyyMMdd");
} else {
sdf = new SimpleDateFormat("yyyyMMddHHmmss");
dft = new SimpleDateFormat("yyyyMMddHHmmss");
}
ds = sdf.parse(date);
Calendar calendar = Calendar.getInstance();
calendar.setTime(ds);
calendar.add(calendar.DAY_OF_YEAR, days);
return (date != null && date.trim().length() == 8) ? dft.format(calendar.getTime())+"235959" : dft.format(calendar.getTime());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "";
}
/**
* 比较2个时间字符串大小
* @param dateTime1 日期字符串1,yyyymmdd或yyyyMMddHHmmss
* @param dateTime2 日期字符串1,yyyymmdd或yyyyMMddHHmmss
* @return ture表示小于等于,false表示大于
*/
public static Boolean compareTime(String dateTime1, String dateTime2) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
java.util.Date ds1 = sdf.parse(dateTime1);
Calendar calendar1 = Calendar.getInstance();
calendar1.setTime(ds1);
java.util.Date ds2 = sdf.parse(dateTime2);
Calendar calendar2 = Calendar.getInstance();
calendar2.setTime(ds2);
return calendar1.compareTo(calendar2) <= 0 ? true : false;
} catch (ParseException e) {
e.printStackTrace();
}
return false;
}
3.【设置变量】:在左侧【核心对象】中检索“设置变量”组件,双击该组件并在弹出的窗口中填写变量,如图:
第三步:创建作业:
打开Spoon.bat,在弹出的Kettle界面中依次选择【文件】/【新建】/【作业】,创建一个作业,保存名称为“变量日期自增作业”,并添加以下组件:
1.【启动】:在左侧【主对象树】中检索“Start”组件,双击该组件并在弹出的窗口中设置作业的调度频率,如图:
2.【初始变量】:在左侧【核心对象】中检索“设置变量”组件,双击该组件并在弹出的窗口中设置作业的变量名称、变量值,如图:
3.【校验字段的值】:在左侧【核心对象】中检索“检验字段的值”组件,双击该组件并在弹出的窗口中设置作业循环的判断条件,如图:
4【业务逻辑】:在左侧【核心对象】中检索“转换”组件,双击该组件并在弹出的窗口中设置作业对应的业务转换(数据的插入与抽取),如图:
5.【日期变量累加n】:在左侧【核心对象】中检索“转换”组件,双击该组件并在弹出的窗口中设置作业日期变量累加转换,如图:
6.【中止作业】:在左侧【核心对象】中检索“中止作业”组件,当变量不符合循环条件时执行该步骤,如图:
第四步:启动作业:
点击作业左侧的【启动】按钮执行作业,如图: