Here’s the table of contents:
生成任务
-- 配置任务
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_set_PRE删除','`PRE中文全称`节点动态增加`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_remove_PRE删除','`PRE中文全称`节点动态移除`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_remove_PRE删除
定义入参
--- `task_cql`中定义一个变量`{fragment}`,检查点时间会自动替换该变量
WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
编写一个测试脚本
WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc(jdbc_etl_url,'SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
// 批量迭代执行节点构建
WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,'{fragment}',TOSTRING(apoc.date.convertFormat(rawCheckPoint,'yyyy-MM-dd HH:mm:ss','yyyyMMddHHmmss'))) AS task_cql,currentTime,rawCheckPoint
CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
// batchFailedSize>0则任务状态回滚【设置node_check_point、rel_check_point等于rawCheckPoint】
// batchFailedSize<=0【更新node_check_point、rel_check_point为系统时间】
WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,
'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\''+task_hcode+'\']) YIELD row RETURN row' AS rollbackSql,
'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$currentTime,\''+task_hcode+'\']) YIELD row RETURN row' AS updateSql
CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})
YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',[task_hcode]) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
将测试脚本配置为一个任务
WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
WITH jdbc_etl_url,task_hcode,task_cql,'task.update.by.check_point.'+REPLACE('','-','_')+''+REDUCE(hcode='',hcodeStrs IN apoc.text.regexGroups(task_hcode,'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+') | hcode+hcodeStrs[0]+'_') AS taskName
,'WITH \'{jdbc_etl_url}\' AS jdbc_etl_url, \'{task_hcode}\' AS task_hcode, \'{task_cql}\' AS task_cql CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql CALL apoc.load.jdbc(jdbc_etl_url,\'SELECT DATE_FORMAT(node_check_point,\\\'%Y-%m-%d %H:%i:%s\\\') AS check_point,DATE_FORMAT(NOW(),\\\'%Y-%m-%d %H:%i:%s\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\'{fragment}\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\'yyyy-MM-dd HH:mm:ss\',\'yyyyMMddHHmmss\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint, \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\',[$rawCheckPoint,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS rollbackSql, \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\',[$currentTime,$currentTime,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;' AS cypher_task
WITH taskName,olab.replace(cypher_task,[{raw:'{jdbc_etl_url}',rep:jdbc_etl_url},{raw:'{task_hcode}',rep:task_hcode},{raw:'{task_cql}',rep:task_cql}]) AS cypher_task
CALL apoc.custom.asProcedure(
taskName,
cypher_task,
'WRITE',
[['result','MAP']],
[],
'自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
)
RETURN 'CALL custom.'+taskName AS procedure_name,cypher_task
CALL custom.task.update.by.check_point.HGRAPHTASK_PRE中文全称__set_PRE删除_
将测试脚本使用参数化方式封装为过程
@param {jdbc_etl_url}-支持MySQL、Oracle、SqlServer
@param {task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel)
@param {task_cql}-任务CQL:需要获取检查点并定期执行的CQL
CALL apoc.custom.asProcedure(
'task.update.by.check_point',
'WITH $jdbc_etl_url AS jdbc_etl_url, $task_hcode AS task_hcode, $task_cql AS task_cql WITH jdbc_etl_url,task_hcode,task_cql,\'task.update.by.check_point.\'+REPLACE(\'\',\'-\',\'_\')+\'\'+REDUCE(hcode=\'\',hcodeStrs IN apoc.text.regexGroups(task_hcode,\'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+\') | hcode+hcodeStrs[0]+\'_\') AS taskName ,\'WITH \\\'{jdbc_etl_url}\\\' AS jdbc_etl_url, \\\'{task_hcode}\\\' AS task_hcode, \\\'{task_cql}\\\' AS task_cql CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\\\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql CALL apoc.load.jdbc(jdbc_etl_url,\\\'SELECT DATE_FORMAT(node_check_point,\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\') AS check_point,DATE_FORMAT(NOW(),\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\\\',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\\\'{fragment}\\\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\\\'yyyy-MM-dd HH:mm:ss\\\',\\\'yyyyMMddHHmmss\\\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint, \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\\\\\',[$rawCheckPoint,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS rollbackSql, \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\\\\\',[$currentTime,$currentTime,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\\\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;\' AS cypher_task WITH taskName,olab.replace(cypher_task,[{raw:\'{jdbc_etl_url}\',rep:jdbc_etl_url},{raw:\'{task_hcode}\',rep:task_hcode},{raw:\'{task_cql}\',rep:task_cql}]) AS cypher_task CALL apoc.custom.asProcedure( taskName, cypher_task, \'WRITE\', [[\'result\',\'MAP\']], [], \'自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL\' ) RETURN \'CALL custom.\'+taskName AS procedure_name,cypher_task',
'WRITE',
[['procedure_name','STRING'],['cypher_task','STRING']],
[['jdbc_etl_url','STRING'],['task_hcode','STRING'],['task_cql','STRING']],
'自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
);
生产中使用时需要注意:1、需要按照检查点时间进行更新时,必须设置
{fragment}
参数;2、如果需要任务锁正常释放,则必须使task_cql有返回值
- 可以正常释放锁并替换检查点时间【生产可用】
WITH 'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url, 'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode, 'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
- 不可以正常释放锁【因为无返回值】,不替换检查点时间【因为没有设置
{fragment}
参数】【生产不可用】WITH 'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url, 'HGRAPHTASK()-[]->()_测试任务ID' AS task_hcode, 'MATCH (n:Test) SET n.mark=1' AS task_cql CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
- 可以正常释放锁,不替换检查点时间【因为没有设置
{fragment}
参数】【生产可用】WITH 'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url, 'HGRAPHTASK()-[]->()_测试任务ID' AS task_hcode, 'CALL apoc.periodic.iterate( \\\'MATCH (n:Test) RETURN n \\\', \\\'WITH {n} AS n SET n.mark=1 \\\', {parallel:false,batchSize:1000}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
PREVIOUSMySQL远程导出文件