自动生成更新任务脚本

 

Here’s the table of contents:

  1. 生成任务
  2. 定义入参
  3. 编写一个测试脚本
  4. 将测试脚本配置为一个任务
  5. 将测试脚本使用参数化方式封装为过程

生成任务

-- 配置任务
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_set_PRE删除','`PRE中文全称`节点动态增加`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_remove_PRE删除','`PRE中文全称`节点动态移除`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_remove_PRE删除

定义入参

--- `task_cql`中定义一个变量`{fragment}`,检查点时间会自动替换该变量
WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task

编写一个测试脚本

WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc(jdbc_etl_url,'SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
// 批量迭代执行节点构建
WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,'{fragment}',TOSTRING(apoc.date.convertFormat(rawCheckPoint,'yyyy-MM-dd HH:mm:ss','yyyyMMddHHmmss'))) AS task_cql,currentTime,rawCheckPoint
CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
// batchFailedSize>0则任务状态回滚【设置node_check_point、rel_check_point等于rawCheckPoint】
// batchFailedSize<=0【更新node_check_point、rel_check_point为系统时间】
WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,
    'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\''+task_hcode+'\']) YIELD row RETURN row' AS rollbackSql,
    'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$currentTime,\''+task_hcode+'\']) YIELD row RETURN row' AS updateSql
CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})
    YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',[task_hcode]) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

将测试脚本配置为一个任务

WITH
'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
WITH jdbc_etl_url,task_hcode,task_cql,'task.update.by.check_point.'+REPLACE('','-','_')+''+REDUCE(hcode='',hcodeStrs IN apoc.text.regexGroups(task_hcode,'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+') | hcode+hcodeStrs[0]+'_') AS taskName
    ,'WITH \'{jdbc_etl_url}\' AS jdbc_etl_url, \'{task_hcode}\' AS task_hcode, \'{task_cql}\' AS task_cql CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql CALL apoc.load.jdbc(jdbc_etl_url,\'SELECT DATE_FORMAT(node_check_point,\\\'%Y-%m-%d %H:%i:%s\\\') AS check_point,DATE_FORMAT(NOW(),\\\'%Y-%m-%d %H:%i:%s\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\'{fragment}\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\'yyyy-MM-dd HH:mm:ss\',\'yyyyMMddHHmmss\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,     \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\',[$rawCheckPoint,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS rollbackSql,     \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\',[$currentTime,$currentTime,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})     YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;' AS cypher_task
WITH taskName,olab.replace(cypher_task,[{raw:'{jdbc_etl_url}',rep:jdbc_etl_url},{raw:'{task_hcode}',rep:task_hcode},{raw:'{task_cql}',rep:task_cql}]) AS cypher_task
CALL apoc.custom.asProcedure(
  taskName,
  cypher_task,
  'WRITE',
  [['result','MAP']],
  [],
  '自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
)
RETURN 'CALL custom.'+taskName AS procedure_name,cypher_task
CALL custom.task.update.by.check_point.HGRAPHTASK_PRE中文全称__set_PRE删除_

将测试脚本使用参数化方式封装为过程

@param {jdbc_etl_url}-支持MySQL、Oracle、SqlServer
@param {task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel)
@param {task_cql}-任务CQL:需要获取检查点并定期执行的CQL
CALL apoc.custom.asProcedure(
  'task.update.by.check_point',
  'WITH $jdbc_etl_url AS jdbc_etl_url, $task_hcode AS task_hcode, $task_cql AS task_cql WITH jdbc_etl_url,task_hcode,task_cql,\'task.update.by.check_point.\'+REPLACE(\'\',\'-\',\'_\')+\'\'+REDUCE(hcode=\'\',hcodeStrs IN apoc.text.regexGroups(task_hcode,\'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+\') | hcode+hcodeStrs[0]+\'_\') AS taskName     ,\'WITH \\\'{jdbc_etl_url}\\\' AS jdbc_etl_url, \\\'{task_hcode}\\\' AS task_hcode, \\\'{task_cql}\\\' AS task_cql CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\\\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql CALL apoc.load.jdbc(jdbc_etl_url,\\\'SELECT DATE_FORMAT(node_check_point,\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\') AS check_point,DATE_FORMAT(NOW(),\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\\\',[task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\\\'{fragment}\\\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\\\'yyyy-MM-dd HH:mm:ss\\\',\\\'yyyyMMddHHmmss\\\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,     \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\\\\\',[$rawCheckPoint,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS rollbackSql,     \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\\\\\',[$currentTime,$currentTime,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})     YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\\\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;\' AS cypher_task WITH taskName,olab.replace(cypher_task,[{raw:\'{jdbc_etl_url}\',rep:jdbc_etl_url},{raw:\'{task_hcode}\',rep:task_hcode},{raw:\'{task_cql}\',rep:task_cql}]) AS cypher_task CALL apoc.custom.asProcedure(   taskName,   cypher_task,   \'WRITE\',   [[\'result\',\'MAP\']],   [],   \'自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL\' ) RETURN \'CALL custom.\'+taskName AS procedure_name,cypher_task',
  'WRITE',
  [['procedure_name','STRING'],['cypher_task','STRING']],
  [['jdbc_etl_url','STRING'],['task_hcode','STRING'],['task_cql','STRING']],
  '自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
);

生产中使用时需要注意:1、需要按照检查点时间进行更新时,必须设置{fragment}参数;2、如果需要任务锁正常释放,则必须使task_cql有返回值

  • 可以正常释放锁并替换检查点时间【生产可用】
    WITH
    'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
    'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
    'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
    CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
    
  • 不可以正常释放锁【因为无返回值】,不替换检查点时间【因为没有设置{fragment}参数】【生产不可用】
    WITH
    'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
    'HGRAPHTASK()-[]->()_测试任务ID' AS task_hcode,
    'MATCH (n:Test) SET n.mark=1' AS task_cql
    CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
    
  • 可以正常释放锁,不替换检查点时间【因为没有设置{fragment}参数】【生产可用】
    WITH
    'jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/analytics_graph_data?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
    'HGRAPHTASK()-[]->()_测试任务ID' AS task_hcode,
    'CALL apoc.periodic.iterate( \\\'MATCH (n:Test) RETURN n \\\', \\\'WITH {n} AS n SET n.mark=1 \\\', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql
    CALL custom.task.update.by.check_point(jdbc_etl_url,task_hcode,task_cql) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task