数据血缘和算子引擎图数据模型

 

Here’s the table of contents:

  1. 数据血缘图数据模型
    1. Label
    2. Relationship
    3. 模拟数据
  2. 算子引擎图数据模型
    1. Label
    2. Relationship
    3. 模拟数据
  3. 加载算子引擎和数据血缘全景图
  4. 分页加载算子列表
  5. 加载算子引擎全景图
  6. 加载数据血缘全景图
  7. 获取算子的API地址和样例入参
  8. 执行算子API并获取output输入给下一步等待执行的算子
  9. 存储指标ID【样例存储方式】【使用JSON串存储多个指标ID】
  10. 通过指标ID获取关联的数据血缘图谱
  11. 通过指标ID获取关联的指标算子图谱
  12. 数据血缘全景图相关的指标
  13. 算子引擎全景图相关的指标
  14. 将算子的指标ID列表整合到一个列表

数据血缘图数据模型

构建数据库->表->字段的血缘关系

Label

  • 数据血缘
  • MySQLDB(name,hupdatetime)
  • MySQLDBTB(name,hupdatetime)
  • MySQLDBTBField(name,hupdatetime)

    Relationship

  • 包含
  • 关联
  • 生成
  • 导入

    模拟数据

  • 索引与约束
    CREATE CONSTRAINT ON (n:MySQLDB) ASSERT n.name IS UNIQUE
    CREATE CONSTRAINT ON (n:MySQLDBTB) ASSERT n.name IS UNIQUE
    CREATE CONSTRAINT ON (n:MySQLDBTBField) ASSERT n.name IS UNIQUE
    
  • 数据库表与字段
    merge (db:MySQLDB {name:'HackathonDB1'}) set db +={hupdatetime:'20200702121021'}
    merge (tb1:MySQLDBTB {name:'table1'}) set tb1 +={hupdatetime:'20200702121021'}
    merge (tb2:MySQLDBTB {name:'table2'}) set tb2 +={hupdatetime:'20200702121021'}
    merge (tb3:MySQLDBTB {name:'table3'}) set tb3 +={hupdatetime:'20200702121021'}
    merge (field1:MySQLDBTBField {name:'field1'}) set field1 +={hupdatetime:'20200702121021'}
    merge (field2:MySQLDBTBField {name:'field2'}) set field2 +={hupdatetime:'20200702121021'}
    merge (field3:MySQLDBTBField {name:'field3'}) set field3 +={hupdatetime:'20200702121021'}
    merge (field4:MySQLDBTBField {name:'field4'}) set field4 +={hupdatetime:'20200702121021'}
    merge (db)-[:包含]->(tb1)
    merge (db)-[:包含]->(tb2)
    merge (db)-[:包含]->(tb3)
    merge (tb1)-[:包含]->(field1)
    merge (tb1)-[:包含]->(field2)
    merge (tb2)-[:包含]->(field3)
    merge (tb3)-[:包含]->(field4)
    
  • 数据库表与字段
    merge (db:MySQLDB {name:'HackathonDB2'}) set db +={hupdatetime:'20200702121021'}
    merge (tb1:MySQLDBTB {name:'dev_table1'}) set tb1 +={hupdatetime:'20200702121021'}
    merge (tb2:MySQLDBTB {name:'dev_table2'}) set tb2 +={hupdatetime:'20200702121021'}
    merge (tb3:MySQLDBTB {name:'dev_table3'}) set tb3 +={hupdatetime:'20200702121021'}
    merge (field1:MySQLDBTBField {name:'dev_field1'}) set field1 +={hupdatetime:'20200702121021'}
    merge (field2:MySQLDBTBField {name:'dev_field2'}) set field2 +={hupdatetime:'20200702121021'}
    merge (field3:MySQLDBTBField {name:'dev_field3'}) set field3 +={hupdatetime:'20200702121021'}
    merge (field4:MySQLDBTBField {name:'dev_field4'}) set field4 +={hupdatetime:'20200702121021'}
    merge (db)-[:包含]->(tb1)
    merge (db)-[:包含]->(tb2)
    merge (db)-[:包含]->(tb3)
    merge (tb1)-[:包含]->(field1)
    merge (tb1)-[:包含]->(field2)
    merge (tb2)-[:包含]->(field3)
    merge (tb3)-[:包含]->(field4)
    
  • 增加数据血缘标签
    MATCH p=(n:MySQLDB)-[*..6]-() WITH NODES(p) AS nodeList
    UNWIND nodeList AS node
    MATCH (node) SET node:数据血缘
    

算子引擎图数据模型

构建算子模型之间的计算引擎模型

Label

  • 算子(interface,name,hupdatetime,calculation_engine:{api:,input:,})
  • 数据源

Relationship

  • 输入(hupdatetime,operator_combine_list:[[算子1,算子2,算子3],[算子1,算子4,算子5]])

模拟数据

  • 索引与约束
    CREATE CONSTRAINT ON (n:算子) ASSERT n.interface IS UNIQUE
    
  • 算子
    merge (c1:算子 {interface:'/read/d/transaction/c1'}) set c1 +={name:'c1',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c1\',input:\'?\'}'}
    merge (c2:算子 {interface:'/read/d/transaction/c1'}) set c2 +={name:'c2',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c2\',input:\'?\'}'}
    merge (c3:算子 {interface:'/read/d/transaction/c3'}) set c3 +={name:'c3',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c3\',input:\'?\'}'}
    merge (c4:算子 {interface:'/read/d/transaction/c4'}) set c4 +={name:'c4',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c4\',input:\'?\'}'}
    merge (c5:算子 {interface:'/read/d/transaction/c5'}) set c5 +={name:'c5',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c5\',input:\'?\'}'}
    merge (c6:算子 {interface:'/read/d/transaction/c6'}) set c6 +={name:'c6',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/c6\',input:\'?\'}'}
    merge (c1)-[r1:输入]->(c3) set r1 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'/read/d/transaction/c1\']]'}
    merge (c2)-[r2:输入]->(c3) set r2 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'/read/d/transaction/c2\']]'}
    merge (c3)-[r3:输入]->(c5) set r3 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'/read/d/transaction/c3\',\'/read/d/transaction/c4\']]'}
    merge (c4)-[r4:输入]->(c5) set r4 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'/read/d/transaction/c3\',\'/read/d/transaction/c4\']]'}
    merge (c5)-[r5:输入]->(c6) set r5 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'/read/d/transaction/c5\']]'}
    
  • 算子关联上表字段
    match (c1:算子 {interface:'/read/d/transaction/c1'})
    match (c2:算子 {interface:'/read/d/transaction/c1'})
    match (c3:算子 {interface:'/read/d/transaction/c3'})
    match (c4:算子 {interface:'/read/d/transaction/c4'})
    match (c5:算子 {interface:'/read/d/transaction/c5'})
    match (c6:算子 {interface:'/read/d/transaction/c6'})
    match (field1:MySQLDBTBField {name:'field1'})
    match (field2:MySQLDBTBField {name:'field2'})
    match (field3:MySQLDBTBField {name:'field3'})
    match (field4:MySQLDBTBField {name:'field4'})
    match (dev_field1:MySQLDBTBField {name:'dev_field1'})
    match (dev_field2:MySQLDBTBField {name:'dev_field2'})
    match (dev_field3:MySQLDBTBField {name:'dev_field3'})
    match (dev_field4:MySQLDBTBField {name:'dev_field4'})
    merge (c1)<-[:数据源]-(field1)
    merge (c2)<-[:数据源]-(field2)
    merge (c2)<-[:数据源]-(dev_field1)
    merge (c2)<-[:数据源]-(dev_field2)
    merge (c3)<-[:数据源]-(field3)
    merge (c3)<-[:数据源]-(dev_field3)
    merge (c4)<-[:数据源]-(field4)
    merge (c5)<-[:数据源]-(dev_field4)
    
  • 增加算子引擎标签
    MATCH p=(n:算子)-[*..6]-() WITH NODES(p) AS nodeList
    UNWIND nodeList AS node
    MATCH (node) SET node:算子引擎
    

加载算子引擎和数据血缘全景图

MATCH (n:算子引擎) OPTIONAL MATCH p=(n)-[]->() RETURN p
UNION ALL
MATCH (n:数据血缘) OPTIONAL MATCH p=(n)-[]->() RETURN p

数据血缘和图算子

分页加载算子列表

MATCH (n:算子) RETURN n SKIP 0 LIMIT 10

加载算子引擎全景图

MATCH (n:算子引擎) OPTIONAL MATCH p=(n)-[]->(:算子引擎) RETURN p

加载数据血缘全景图

MATCH (n:数据血缘) OPTIONAL MATCH p=(n)-[]->(:数据血缘) RETURN p

获取算子的API地址和样例入参

MATCH (n:算子) WITH apoc.convert.fromJsonMap(n.calculation_engine) AS calculation_engine RETURN calculation_engine.api AS api,calculation_engine.input AS input

执行算子API并获取output输入给下一步等待执行的算子

  • 创建两个测试算子
    merge (c1:算子 {interface:'test1-/ongdb/read/d/transaction/commit'})
    set c1 +={name:'test1算子',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/commit\',input:\'{"statements": [{"statement": "match (n:算子) return n limit 10;"}],"user": "ongdb","password": "ongdb%dev"}\'}'}
    merge (c2:算子 {interface:'test2-/ongdb/read/d/transaction/commit'})
    set c2 +={name:'test2算子',hupdatetime:'20200702121021',calculation_engine:'{api:\'http://localhost:7424/ongdb/read/d/transaction/commit\',input:\'{"statements": [{"statement": "match (n:算子) return n limit 10;"}],"user": "ongdb","password": "ongdb%dev"}\'}'}
    merge (c1)-[r1:输入]->(c2) set r1 +={hupdatetime:'20200702121021',operator_combine_list:'[[\'test1-/ongdb/read/d/transaction/commit\']]'}
    match (n:算子) where n.name IN ['test1算子','test2算子'] return n
    
  • 获取‘test1算子’的output之后解析出节点ID,输入到下一步算子继续执行【在第二步算子中拼接一个多节点匹配的查询然后执行】,并返回最终算子的执行结果
    MATCH (n:算子) WHERE ID(n)=37675072 WITH apoc.convert.fromJsonMap(n.calculation_engine) AS calculation_engine,n
    WITH calculation_engine.api AS api,calculation_engine.input AS input,n
    WITH olab.http.post(api,input) AS resultString,n
    WITH apoc.convert.fromJsonMap(resultString).result.results[0].data[0].graph.nodes AS nodeList,n
    UNWIND nodeList AS node
    WITH COLLECT(node.id) AS ids,n
    MATCH (n)-[:输入]->(m) WITH apoc.convert.fromJsonMap(m.calculation_engine).input AS cypher,ids
    WITH apoc.convert.fromJsonMap(cypher).statements[0].statement AS cypher,ids
    WITH REPLACE(cypher,'return n limit 10;','WHERE ID(n) IN '+toString(apoc.convert.toJson(ids))+' return n') AS cypher
    CALL apoc.cypher.run(cypher,null) yield value return value
    
  • 多个算子按照入参的顺序执行 【需要拆分成多步】 【按1步传导执行】 【传入参之前判断‘输入’关系中operator参数数量 || 或者将一个完整的算子计算流程在数据层完全隔离,即不需要判断operator组合情况】 【多个入参需要先执行入参拿到output再执行下一步】
    MATCH (n:算子) WHERE ID(n)=37674500
    MATCH (n)-[r:输入]->(m) WITH SIZE(apoc.convert.fromJsonList(r.operator_combine_list))>1 RETURN m
    ...
    

    存储指标ID【样例存储方式】【使用JSON串存储多个指标ID】

    MATCH (n:MySQLDBTBField) SET n.indicator_ids='["HIDSC1","HIDSC2"]'
    MATCH (n:算子) SET n.indicator_ids='["HIDSC1","HIDSC2"]'
    

    通过指标ID获取关联的数据血缘图谱

    MATCH (n:MySQLDBTBField) WHERE ANY(indicator IN apoc.convert.fromJsonList(n.indicator_ids) WHERE indicator.code='HIND00000000008')
    CALL apoc.path.expand(n,NULL,'数据血缘',0,3) YIELD path RETURN path
    

    通过指标ID获取关联的指标算子图谱

    MATCH (n:算子) WHERE ANY(indicator IN apoc.convert.fromJsonList(n.indicator_ids) WHERE indicator.code='HIND00000000012')
    MATCH p=(n)<-[*..6]-(:算子) RETURN p
    

数据血缘全景图相关的指标

1、数据库数量统计【MATCH (n:MySQLDB) RETURN COUNT(n) AS count】
2、数据表数量统计【MATCH (n:MySQLDBTB) RETURN COUNT(n) AS count】
3、数据字段数量统计【MATCH (n:MySQLDBTBField) RETURN COUNT(n) AS count】
4、被其它表使用最多的表前三个【MATCH (n:MySQLDBTBField) WHERE SIZE((n)<--(:MySQLDBTBField))=0 WITH n MATCH (n)<-[:包含]-(m:MySQLDBTB) WITH m MATCH p=(m)-[:包含]->(k:MySQLDBTBField)-->(f:MySQLDBTBField) RETURN m,COUNT(p) AS count ORDER BY count DESC LIMIT 3】
5、衍生字段数量【MATCH (n:MySQLDBTBField) WHERE SIZE((n)<--(:MySQLDBTBField))>1 RETURN COUNT(n)】
6、原始字段数量【MATCH (n:MySQLDBTBField) WHERE SIZE((n)<--(:MySQLDBTBField))=0 RETURN COUNT(n】
7、衍生表数量【MATCH (n:MySQLDBTBField) WHERE SIZE((n)<--(:MySQLDBTBField))>1 WITH n MATCH (n)<-[:包含]-(m:MySQLDBTB) RETURN COUNT(m)】
8、原始表数量【MATCH (n:MySQLDBTBField) WHERE SIZE((n)<--(:MySQLDBTBField))=0 WITH n MATCH (n)<-[:包含]-(m:MySQLDBTB) RETURN COUNT(m)】

算子引擎全景图相关的指标

1、获取数据的算子【MATCH (n:算子) WHERE n.is_get_data=true RETURN COUNT(n)】
2、计算算子【MATCH (n:算子) WHERE n.is_get_data=false RETURN COUNT(n)】
3、指标类型相关的算子统计【给SQL】
4、依赖算子最多的节点前三个【MATCH (n:算子)-[r]->(m:算子) RETURN m,SIZE(apoc.convert.fromJsonList(r.operator_combine_list)) AS size ORDER BY size DESC LIMIT 3】
5、被算子API使用的字段【MATCH (n:算子)<-[:获取数据]-(m:MySQLDBTBField) WHERE n.is_get_data=true RETURN COUNT(m)】

将算子的指标ID列表整合到一个列表

MATCH (n:`算子`) WITH apoc.convert.fromJsonList(n.indicator_ids) AS indicatorList
WHERE ANY(indicator IN indicatorList WHERE indicator.code<>'null') WITH indicatorList
UNWIND indicatorList AS indicator
RETURN COLLECT(indicator) AS indicatorList