关于通过java调用datax,返回任务执行的方法

网友投稿 961 2022-12-29


关于通过java调用datax,返回任务执行的方法

DATAX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

datax的详细介绍

请参考 DataX-Introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry方法,该方法是一个静态方法。

public static void entry(final String[] args) throws Throwable {

Options options = new Options();

options.addOption("job", true, "Job config.");

options.addOption("jobid", true, "Job unique id.");

options.addOption("mode", true, "Job runtime mode.");

BasicParser parser = new BasicParser();

CommandLine cl = parser.parse(options, args);

String jobPath = cl.getOptionValue("job");

// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1

String jobIdString = cl.getOptionValue("jobid");

RUNTIME_MODE = cl.getOptionValue("mode");

Configuration configuration = ConfigParser.parse(jobPath);

long jobId;

if (!"-1".equalsIgnoreCase(jobIdString)) {

jobId = Long.parseLong(jobIdString);

} else {

// only for dsc & ds & datax 3 update

String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";

String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";

String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";

List patternStringList = Arrays.asList(dscJobUrlPatternString,

dsJobUrlPatternString, dsTaskGroupUrlPatternString);

jobId = parseJobIdFromUrl(patternStringList, jobPath);

}

boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);

if (!isStandAloneMode && jobId == -1) {

// 如果不是 standalone 模式,那么 jobId 一定不能为-1

throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");

}

configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JZhfDBYOB_ID, jobId);

//打印vmInfo

VMInfo vmInfo = VMInfo.getVmInfo();

if (vmInfo != null) {

LOG.info(vmInfo.toString());

}

LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

LOG.debug(configuration.tojsON());

ConfigurationValidate.doValidate(configuration);

Engine engine = new Engine();

engine.start(configuration);

}

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用JobContainer 的start() 方法。

@Override

public void start() {

LOG.info("DataX jobContainer starts job.");

boolean hasException = false;

boolean isDryRun = false;

try {

this.startTimeStamp = System.currentTimeMillis();

isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);

if (isDryRun) {

LOG.info("jobContainer starts to do preCheck ...");

this.preCheck();

} else {

userConf = configuration.clone();

LOG.debug("jobContainer starts to do preHandle ...");

this.preHandle();

LOG.debug("jobContainer starts to do init ...");

this.init();

LOG.info("jobContainer starts to do prepare ...");

this.prepare();

LOG.info("jobContainer starts to do split ...");

this.totalStage = this.split();

LOG.info("jobContainer starts to do schedule ...");

this.schedule();

LOG.debug("jobContainer starts to do post ...");

this.post();

LOG.debug("jobContainer starts to do postHandle ...");

this.postHandle();

LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

this.invokeHooks();

}

} catch (Throwable e) {

LOG.error("Exception when job run", e);

hasException = true;

if (e instanceof OutOfMemoryError) {

this.destroy();

System.gc();

}

if (super.getContainerCommunicator() == null) {

// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化

AbstractContainerCommunicator tempContainerCollector;

// standalone

tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);

super.setContainerCommunicator(tempContainerCollector);

}

Communication communication = super.getContainerCommunicator().collect();

// 汇报前的状态,不需要手动进行设置

// communication.setState(State.FAILED);

communication.setThrowable(e);

communication.setTimestamp(this.endTimeStamp);

Communication tempComm = new Communication();

tempComm.setTimestamp(this.startTransferTimeStamp);

Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);

super.getContainerCommunicator().report(reportCommunication);

throw DataXException.asDataXException(

FrameworkErrorCode.RUNTIME_ERROR, e);

} finally {

if (!isDryRun) {

this.destroy();

this.endTimeStamp = System.currentTimeMillis();

if (!hasException) {

//最后打印cpu的平均消耗,GC的统计

VMInfo vmInfo = VMInfo.getVmInfo();

if (vmInfo != null) {

vmInfo.getDelta(false);

LOG.info(vmInfo.totalString());

}

LOG.info(PerfTrace.getInstance().summarizeNoException());

this.logStatistics();

}

}

}

}

而我们需要的任务信息就在this.logStatistics() 中

private void logStatistics() {

long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;

long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;

if (0L == transferCosts) {

transferCosts = 1L;

}

if (super.getContainerCommunicator() == null) {

return;

}

Communication communication = super.getContainerCommunicator().collect();

communication.setTimestamp(this.endTimeStamp);

Communication tempComm = new Communication();

tempComm.setTimestamp(this.startTransferTimeStamp);

Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);

// 字节速率

long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)

/ transferCosts;

long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)

/ transferCosts;

reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);

reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);

super.getContainerCommunicator().report(reportCommunication);

LOG.info(String.format(

"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"

+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"

+ "%-26s: %19s\n",

"任务启动时刻",

dateFormat.format(startTimeStamp),

"任务结束时刻",

dateFormat.format(endTimeStamp),

"任务总计耗时",

String.valueOf(totalCosts) + "s",

"任务平均流量",

StrUtil.stringify(byteSpeedPerSecond)

+ "/s",

"记录写入速度",

String.valueOf(recordSpeedPerSecond)

+ "rec/s", "读出记录总数",

String.valueOf(CommunicationTool.getTotalReadRecords(communication)),

"读写失败总数",

String.valueOf(CommunicationTool.getTotalErrorRecords(communication))

));

LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +

dateFormat.format(endTimeStamp) + "|" +

String.valueOf(totalCosts) + "|" +

StrUtil.stringify(byteSpeedPerSecond) + "|" +

String.valueOf(recordSpeedPerSecond) + "|" +

String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +

String.valueOf(CommunicationTool.getTotalErrorRecords(communication))

);

if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0

|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0

|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {

LOG.info(String.format(

"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",

"Transformer成功记录总数",

communication.getLongCounter(CommunicationTool.TRANSFORMER_SUhttp://CCEED_RECORDS),

"Transformer失败记录总数",

communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

"Transformer过滤记录总数",

communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)

));

}

}

改造开始

新增返回实体DataxResult (get、set省略)

public class DataxResult {

//任务启动时刻

private long startTimeStamp;

//任务结束时刻

private long endTimeStamp;

//任务总时耗

private long totalCosts;

//任务平均流量

private long byteSpeedPerSecond;

//记录写入速度

private long recordSpeedPerSecond;

//读出记录总数

private long totalReadRecords;

//读写失败总数

private long totalErrorRecords;

//成功记录总数

private long transformerSucceedRecords;

// 失败记录总数

private long transformerFailedRecords;

// 过滤记录总数

private long transformerFilterRecords;

//字节数

private long readSucceedBytes;

//转换开始时间

private long endTransferTimeStamp;

//转换结束时间

private long startTransferTimeStamp;

//转换总耗时

private long transferCosts;

重写logStatistics方法,返回该实体。

private DataxResult logStatistics(DataxResult resultMsg) {

long totalCosts = (this.endTimeSthttp://amp - this.startTimeStamp) / 1000;

long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;

if (0L == transferCosts) {

transferCosts = 1L;

}

if (super.getContainerCommunicator() == null) {

return resultMsg;

}

Communication communication = super.getContainerCommunicator().collect();

long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)

/ transferCosts;

long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)

/ transferCosts;

return resultMsg.getResultMsg(startTimeStamp,

endTimeStamp,

totalCosts,

byteSpeedPerSecond,

recordSpeedPerSecond,

communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),

communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),

communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),

communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),

this.endTransferTimeStamp,

this.startTransferTimeStamp,

transferCosts

);

}

还需要重写JobContainer的**start()**方法。

@Override

public DataxResult start(DataxResult dataxResult) {

...

DataxResult result = new DataxResult();

result = logStatistics(dataxResult);

...

return result;

}

然后在Engine 类中添加模拟测试方法mockentry

public DataxResult mockstart(Configuration allConf) {

...

DataxResult dataxResult = new DataxResult();

return container.start(dataxResult);

}

开始测试

在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 为本地路径

该datax_home路径下有以下几个目录

public class test {

public static void main(String[] args) {

String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};

try {

DataxResult dataxResult= Engine.mockentry(datxArgs);

} catch (Throwable e) {

e.printStackTrace();

}

}

}

执行结果为

3

大功告成!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微服务网关做灰度(为什么需要微服务网关)
下一篇:什么软件接口测试工具(什么软件接口测试工具好用)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~