常用CYPHER查询(一)

 

Here’s the table of contents:

  1. 常用查询
    1. 从关系数据库MySQL加载数据导入到ongdb
    2. 从关系数据库Oracle加载数据导入到ongdb
    3. 从关系数据库SQL-SERVER加载数据导入到ongdb
    4. 两两计算list的交集
    5. 合并list
    6. 迭代执行
    7. 比较某个节点和一组节点的最短路径
    8. 分组查询
    9. 加索引
    10. 删除索引
    11. 加约束 - 字段约束
    12. 加约束 - 删除字段约束
    13. 加约束 - 存在约束
    14. 加约束 - 删除存在约束
    15. 加约束 - 删除唯一约束
    16. 迭代批量删除
    17. 批量迭代生成MD5
    18. 获取路径中节点
    19. 获取路径中关系
    20. 节点的关系路径排序
    21. 两个节点的关系路径排序-计算相似性之后再排序
    22. 查看schema
    23. 返回数组中第一个元素
    24. 判断标签是否存在
    25. 查看正在运行的查询
    26. 全文索引
    27. 查看索引统计信息
    28. 通过节点ID查询公司名称
    29. 通过节点ID查询公司名称【设置需要返回的字段和不需要返回的字段】
    30. 使用_search接口搜索文档【随机返回一些doc】
    31. 通过name参数搜索/_search?q=name:?
    32. 通过name参数搜索【设置分页参数】/_search?q=name:get
    33. 使用elasticsearch-dsl查询数据
    34. 解析查询返回doc【返回hists中的节点数据】
    35. 解析查询返回doc【返回name】
    36. 通过节点ID查询查询与当前节点名称相同的节点
    37. 统计name字段并返回前10个结果
    38. 统计name字段并返回前1个结果【并在图中展示这些节点】【使用节点ID到图中检索节点不要用名称,因为涉及到检索的大小写转换问题】
    39. 返回前一百个公司聚簇
    40. 统计指定标签下节点的数量
    41. 统计标签下节点数量
    42. 排序输出最大最小ID
    43. 检查节点无效数据
    44. 通过pcode获取聚簇数据
    45. 超级节点重构
    46. 过滤JSON属性
    47. 查看配置
    48. 去除非数字字符
    49. ‘HORGGuaranteeV001’与50家以上‘上市公司’、‘发债公司’有关联的主体
    50. 返回集合的交集
    51. 返回集合中重复的项
    52. 返回集合中出现次数等于1的元素
    53. 获取API数据
    54. 返回集合的并集
    55. CASE条件语句
  2. 字符串截取
  3. 重构-合并节点
  4. 使用正则提取标签
  5. 过滤出有效标签
  6. 并行搜索
  7. 除法四舍五入
  8. 执行导出数据脚本
  9. [1]分析属性KEY
  10. [2]分析属性KEY
  11. [3]分析属性KEY
  12. 导出全库数据到JSON文件
  13. 查询旧库的关系类型
  14. 导出的全库数据写入新库
  15. 提取中文英文和数字并将字符串转为大写
  16. 提取中文英文和数字并将字符串转为大写并执行繁体转为简体的操作
  17. 实控人网络数据导出【导出CSV】

常用查询

cypher-refcard cypher-manual

从关系数据库MySQL加载数据导入到ongdb

CALL apoc.load.jdbc('jdbc:mysql://testlab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:3306/master_dev?user=dev&password=testlabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 'SELECT * FROM MSTR_ORG_PRE_0 LIMIT 10')
call apoc.cypher.run(“create (a:”+row.OSTYPE+"{csaid:"+row.CSAID+"})",null) yield value
return value.count as count
# apoc.cypher.run只能执行读查询,使用apoc.cypher.doIt执行更新操作。

从关系数据库Oracle加载数据导入到ongdb

CALL apoc.load.driver("oracle.jdbc.driver.OracleDriver");
CALL apoc.load.jdbc('jdbc:oracle:thin:nfdp/testlabgogo@nfdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.alibaba.com.cn:1521/ORCL', 'SELECT * FROM (SELECT rownum rm, a.* FROM ( SELECT * FROM ODSWIND2.ASHAREDIRECTOR ) a WHERE rownum <= 10 ) b WHERE b.rm > 0')

从关系数据库SQL-SERVER加载数据导入到ongdb

CALL apoc.load.driver("com.microsoft.sqlserver.jdbc.SQLServerDriver")
CALL apoc.load.jdbc('jdbc:sqlserver://10.0.185.138:1433;username=testlab;password=tm0BQ4_wi8lB4;DatabaseName=jydb;characterEncoding=utf-8','SELECT TOP 10 * FROM Bond_Default')

两两计算list的交集

apoc.coll.intersection(first, second)

合并list

apoc.coll.union(first, second)

迭代执行

apoc.periodic.commit()

比较某个节点和一组节点的最短路径

MATCH (n),(m) WHERE id(n)=14856  AND id(m) IN [16821,0]
MATCH p = shortestPath((n)-[*..6]-(m)) WITH p
WITH LENGTH(p) AS length,p
RETURN length,p ORDER BY length DESC LIMIT 1

分组查询

with [1,2,3,4,5,6] as list
unwind list as li
call apoc.case(
[li=1 or li=2,"MATCH (n:Partner) RETURN n LIMIT 10"
,li=3,"MATCH (n:Name) RETURN n LIMIT 10"]
,"MATCH (n:ShareHolder) RETURN n LIMIT 10",null) yield value
return value
MATCH … …
WITH group1, group2, collect(data1)[0…9] AS col1, collect(data2)[0…9] AS col2
ORDER BY group1, group2
WITH group1, group2, col1, col2, range(0,9) AS iterator
UNWIND iterator AS i
RETURH group1, group2, col1[i], col2[i]
match (from)-[r]->(to)
where from.id in [1,2,3]
WITH from,r,to
ORDER BY r.age DESC
WITH from, collect( r )[0…5] AS rel
RETURN from, rel

加索引

CREATE INDEX ON :Person(name, age)

删除索引

DROP INDEX ON :Person(name)

加约束 - 字段约束

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE

加约束 - 删除字段约束

DROP CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE

加约束 - 存在约束

CREATE CONSTRAINT ON (n:Person) ASSERT EXISTS(n.name)
CREATE CONSTRAINT ON ()-[r:LIKED]->() ASSERT EXISTS(r.type)

加约束 - 删除存在约束

DROP CONSTRAINT ON (n:Person) ASSERT EXISTS(n.name)
DROP CONSTRAINT ON ()-[r:LIKED]->() ASSERT EXISTS(r.type)

加约束 - 删除唯一约束

DROP CONSTRAINT ON (n:算子) ASSERT n.interface IS UNIQUE

迭代批量删除

CALL apoc.periodic.iterate('MATCH (n) OPTIONAL MATCH (n)-[r]-() RETURN n,r','WITH {n} AS n,{r} AS r DELETE n,r', {parallel:true,batchSize:10000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations

批量迭代生成MD5

CALL apoc.periodic.iterate('MATCH (n:`PRE公司中文名称`) WHERE NOT EXISTS(n.hcode) RETURN n','WITH {n} AS n SET n.hcode=\'HORG\'+apoc.util.md5([n.pcode])', {parallel:true,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations

获取路径中节点

match (n:`组织机构`:`中文名称`),(m:`组织机构`:`中文名称`)
match p=(n)-[*..2]-(m) return extract(node IN nodes(p) | node.name);

获取路径中关系

match (n:`组织机构`:`中文名称`),(m:`组织机构`:`中文名称`)
match p=(n)-[*..2]-(m) return extract(r IN relationships(p) | TYPE(r))

节点的关系路径排序

MATCH (n:`组织机构`:`中文名称`),(m:`组织机构`:`中文名称`)
MATCH p=(n)-[*..2]-(m) WHERE n<>m
WITH extract(r IN relationships(p) | TYPE(r)) AS relList,n,m
WITH collect(relList) AS collectList,n,m
RETURN apoc.convert.toJson(collectList) AS jsonList,SIZE(collectList) AS pathSize,id(n),id(m) ORDER BY pathSize DESC

两个节点的关系路径排序-计算相似性之后再排序

MATCH (n:`组织机构`:`中文名称`),(m:`组织机构`:`中文名称`)
MATCH p=(n)-[*..2]-(m) WHERE n<>m
WITH extract(r IN relationships(p) | TYPE(r)) AS relList,n,m
WITH collect(relList) AS collectList,n,m
WITH apoc.convert.toJson(collectList) AS jsonList,SIZE(collectList) AS pathSize,n,m AS idM ORDER BY pathSize DESC
WITH jsonList,pathSize,n,m
CALL olab.cluster() YIELD similarity,start,end RETURN similarity,idN,idM ORDER BY similarity DESC LIMIT 1;

查看schema

CALL db.schema.visualization

返回数组中第一个元素

MATCH (n) WHERE id(n) IN [14860856,14860856] RETURN [label IN LABELS(n)][0]

判断标签是否存在

MATCH (n) WHERE id(n) IN [14860856,14860856,14860854,14860857] AND ANY(label IN LABELS(n) WHERE label='PREClusterHeart公司') RETURN n
MATCH (n) WHERE id(n) IN [14860856,14860854,14860857] AND ANY(label IN LABELS(n) WHERE label='PRE公司中文名称') RETURN n

查看正在运行的查询

CALL dbms.listQueries YIELD queryId,username,metaData,query,parameters,planner,runtime,indexes,startTime,protocol,clientAddress,requestUri,status,resourceInformation,activeLockCount,elapsedTimeMillis,cpuTimeMillis,waitTimeMillis,idleTimeMillis,allocatedBytes,pageHits,pageFaults,connectionId

全文索引

## 查看可用的分析器
CALL db.index.fulltext.listAvailableAnalyzers()
## 为给定标签和属性创建节点全文索引
## eventually_consistent=true
## analyzer=cjk
## analyzer=standard
CALL db.index.fulltext.createNodeIndex
CALL db.index.fulltext.queryNodes

查看索引统计信息

CALL apoc.es.stats('localhost:9200 ') YIELD value RETURN value
CALL apoc.es.stats('10.20.0.157') YIELD value RETURN value

通过节点ID查询公司名称

CALL apoc.es.get('10.20.0.157','pre_org_cn_node','_all','15097731',null,null) yield value

通过节点ID查询公司名称【设置需要返回的字段和不需要返回的字段】

CALL apoc.es.get('10.20.0.157','pre_org_cn_node','_all','15097731',{_source_include:'name,pcode',_source_exclude:'description'},null) yield value
CALL apoc.es.get('10.20.0.157','pre_org_cn_node','_all','15097731','_source_include=name&_source_exclude=description',null) yield value
CALL apoc.es.get('10.20.0.157','pre_org_cn_node','_all','15097731',{_source_include:'name'},null) yield value
CALL apoc.es.get('10.20.0.157:9200','pre_org_cn_node','_all','15097731',{_source_include:'name'},null) yield value

使用_search接口搜索文档【随机返回一些doc】

CALL apoc.es.query('10.20.0.157','pre_org_cn_node','',null,null) yield value

通过name参数搜索/_search?q=name:?

// 检索多个属性:q=+pcode:{pcode}+name:{name}
CALL apoc.es.query('10.20.0.157','pre_org_cn_node','','q=name:吉林白山航空发展股份有限公司',null) yield value

通过name参数搜索【设置分页参数】/_search?q=name:get

CALL apoc.es.query('10.20.0.157','pre_org_cn_node','','size=1&scroll=1m&_source=true&q=name:吉林白山航空发展股份有限公司',null) yield value

使用elasticsearch-dsl查询数据

CALL apoc.es.query('10.20.0.157','pre_org_cn_node','',null,{query: {match: {name: '吉林白山航空发展股份有限公司'}}}) yield value

解析查询返回doc【返回hists中的节点数据】

CALL apoc.es.query('10.20.0.157','pre_org_cn_node','',null,{query: {match: {name: '吉林白山航空发展股份有限公司'}}}) yield value with value.hits.hits AS hits WITH hits
UNWIND hits AS hit
RETURN hit._source AS nodeData

解析查询返回doc【返回name】

CALL apoc.es.get('10.20.0.157','pre_org_cn_node','_all','15097731',{_source_include:'name'},null) yield value with value._source.name AS name return name

通过节点ID查询查询与当前节点名称相同的节点

// 这个查询看起来复杂度不高,但是当数据库资源占用较多时也非常耗时基本都在秒级以上
MATCH (n:PRE公司中文名称) WHERE ID(n)=" + idNNodeId + " WITH n
MATCH (m:PRE公司中文名称) WHERE n<>m AND m.name=n.name RETURN ID(m) AS idM
WITH 'https://localhost:9200 ' AS esUrl
WITH 'pre_org_cn_node' AS indexName,esUrl
CALL apoc.es.get(esUrl,indexName,'_all','10232180',null,null) YIELD value WITH value._source.name AS name,indexName,esUrl
CALL apoc.es.query(esUrl,indexName,'',null,{query: {term: {name: name}}}) yield value with value.hits.hits AS hits
UNWIND hits AS hit
WITH hit._source.id AS nodeId
MATCH (n:PRE公司中文名称) WHERE ID(n)<>10232180 AND ID(n)=TOINT(nodeId) RETURN n

统计name字段并返回前10个结果

CALL apoc.es.query('10.20.0.157','pre_org_cn_node','',null,{aggs: {field: {terms: {field: 'name',size: 10}}}}) yield value WITH value.aggregations.field.buckets AS aggregations RETURN aggregations

统计name字段并返回前1个结果【并在图中展示这些节点】【使用节点ID到图中检索节点不要用名称,因为涉及到检索的大小写转换问题】

WITH '10.20.0.157' AS esUrl
WITH 'pre_org_cn_node' AS indexName,esUrl
CALL apoc.es.query(esUrl,indexName,'',null,{aggs: {field: {terms: {field: 'name',size: 1}}}}) yield value
WITH value.aggregations.field.buckets AS aggregations,esUrl,indexName
UNWIND aggregations AS keyCount
WITH keyCount.key AS name,esUrl,indexName
CALL apoc.es.query(esUrl,indexName,'','q=name:'+name,null) yield value WITH value.hits.hits AS hits
UNWIND hits AS hit
WITH hit._source.id AS nodeId
MATCH (n:`PRE公司中文名称`) WHERE ID(n)=TOINT(nodeId) RETURN n

使用with设置配置

WITH 'https://localhost:9200 ' AS esUrl
WITH 'pre_org_cn_node' AS indexName,esUrl
CALL apoc.es.query(esUrl,indexName,'',null,{aggs: {field: {terms: {field: 'name',size: 1}}}}) yield value
WITH value.aggregations.field.buckets AS aggregations,esUrl,indexName
UNWIND aggregations AS keyCount
WITH keyCount.key AS name,esUrl,indexName
CALL apoc.es.query(esUrl,indexName,'','q=name:'+name,null) yield value WITH value.hits.hits AS hits
UNWIND hits AS hit
WITH hit._source.id AS nodeId
MATCH (n:`PRE公司中文名称`) WHERE ID(n)=TOINT(nodeId) RETURN n

使用unwind设置配置

UNWIND [{esUrl:'https://localhost:9200 ',indexName:'pre_org_cn_node'}] AS configuration
WITH configuration.esUrl AS esUrl,configuration.indexName AS indexName
CALL apoc.es.query(esUrl,indexName,'',null,{aggs: {field: {terms: {field: 'name',size: 1}}}}) yield value
WITH value.aggregations.field.buckets AS aggregations,esUrl,indexName
UNWIND aggregations AS keyCount
WITH keyCount.key AS name,esUrl,indexName
CALL apoc.es.query(esUrl,indexName,'','q=name:'+name,null) yield value WITH value.hits.hits AS hits
UNWIND hits AS hit
WITH hit._source.id AS nodeId
MATCH (n:`PRE公司中文名称`) WHERE ID(n)=TOINT(nodeId) RETURN n

返回前一百个公司聚簇

CALL apoc.es.query('https://localhost:9200 ','pre_org_cn_node','',null,{size:0,query:{bool:{}},aggs:{cluster_id:{terms:{field:'cluster_id',size:10,shard_size:100000,order:{_count:'DESC'}},aggs:{topHitsData:{top_hits:{size:100,_source:{includes:['name']}}}}},field_count:{cardinality:{precision_threshold:100000,field:'cluster_id'}}}}) yield value
WITH value.aggregations.cluster_id.buckets AS buckets
UNWIND buckets AS topHitsData
RETURN topHitsData.key AS master,topHitsData.doc_count AS slaveCount,topHitsData.topHitsData.hits.hits AS slaves

统计指定标签下节点的数量

MATCH (n:`PRE公司中文名称`) where exists(n.cluster_id) RETURN count(*)

统计标签下节点数量

MATCH (n:`PREClusterHeart公司`) RETURN count(*)

排序输出最大最小ID

MATCH (n:`PRE公司中文名称`) where exists(n.cluster_id) RETURN ID(n) ORDER BY ID(n) ASC LIMIT 1
MATCH (n:`PRE公司中文名称`) where exists(n.cluster_id) RETURN ID(n) ORDER BY ID(n) DESC LIMIT 1

检查节点无效数据

MATCH (n) WHERE n.name='无' OR n.name='空' OR n.name='null' OR n.name='NULL' WITH n
OPTIONAL MATCH (n)<-[r]-() delete r,n

通过pcode获取聚簇数据

// 原始查询
MATCH (m:PRE公司中文名称) WHERE m.pcode='caihui110000484' WITH m.cluster_id AS clusterId
MATCH (n:PRE公司中文名称) WHERE n.cluster_id=clusterId RETURN n.pcode AS pcode,n.hcode AS hcode
// 使用ES优化上述查询
WITH 'https://localhost:9200 ' AS esUrl
WITH 'pre_org_cn_node' AS indexName,esUrl
CALL apoc.es.query(esUrl,'pre_org_cn_node','','q=pcode:caihui110000484',null) yield value WITH value.hits.hits AS hits,indexName,esUrl
UNWIND hits AS hit
WITH hit._source.cluster_id AS clusterId,indexName,esUrl
CALL apoc.es.query(esUrl,indexName,'',null,{query: {term: {cluster_id: clusterId}}}) yield value with value.hits.hits AS hits
UNWIND hits AS hit
RETURN hit._source.pcode AS pcode,hit._source.hcode AS hcode

超级节点重构

MATCH (n:PRE日期) WHERE size((n)<--())>=1000 RETURN n ORDER BY size((n)<--()) DESC LIMIT 1
MATCH (n:PRE地理位置) WHERE size((n)<--())>=1000 RETURN n ORDER BY size((n)<--()) DESC LIMIT 1
...

过滤JSON属性

TYPE(r) DEMO
{
  "name": "test",
  "json": "[{"amount":2600000,"start_date":20180101122235}]"
}
MATCH (n),(m) WHERE ID(n)=14860838 AND ID(m)=14860851 WITH n,m
MATCH p=(n)-[r]-(m) WHERE ANY(proMap IN apoc.convert.fromJsonList(r.json) WHERE proMap.amount>=2600000) RETURN r

查看配置

CALL dbms.listConfig('.logs.') YIELD name, description, value RETURN name, description, value

去除非数字字符

RETURN REPLACE(olab.replace.regexp('2009-07-27 00:00:00.0','[^0-9]'),' ','')

‘HORGGuaranteeV001’与50家以上‘上市公司’、‘发债公司’有关联的主体

MATCH (n:HORGGuaranteeV001)--(m:HORGGuaranteeV001) WHERE ANY(label IN LABELS(m) WHERE label='上市公司' OR label='发债公司')
WITH n.name AS name,SIZE(apoc.coll.frequencies(COLLECT(m.name))) AS size ORDER BY size DESC
WHERE size>=50
RETURN name,size

返回集合的交集

RETURN apoc.coll.intersection(['PREPCODE','PRE公司中文名称','PRE人物','PRE网址','PRE地理位置','PRE电子邮箱','PRE电话','PRE传真','PRE中文简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE公司注册号','PRE地税登记号码','PRE交易代码','PRE其它关联组织机构','PRE纳税人识别号','PRE营业执照号码','PRE国税登记号码','PRE中文拼音简称'],['PREPCODE','PRE公司中文名称','PRE中文简称','PRE中文拼音简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE地理位置','PRE邮编','PRE电子邮箱','PRE网址','PRE人物','PRE电话','PRE传真','PRE上市公司代码','PRE组织机构代码','PRE营业执照号码','PRE国税登记号码','PRE地税登记号码','PRE公司注册号','PRE纳税人识别号','PRE交易代码','PRE其它关联组织机构'])

返回集合中重复的项

RETURN apoc.coll.duplicates(['PREPCODE','PRE公司中文名称','PRE人物','PRE网址','PRE地理位置','PRE电子邮箱','PRE电话','PRE传真','PRE中文简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE公司注册号','PRE地税登记号码','PRE交易代码','PRE其它关联组织机构','PRE纳税人识别号','PRE营业执照号码','PRE国税登记号码','PRE中文拼音简称','PREPCODE','PRE公司中文名称','PRE中文简称','PRE中文拼音简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE地理位置','PRE邮编','PRE电子邮箱','PRE网址','PRE人物','PRE电话','PRE传真','PRE上市公司代码','PRE组织机构代码','PRE营业执照号码','PRE国税登记号码','PRE地税登记号码','PRE公司注册号','PRE纳税人识别号','PRE交易代码','PRE其它关联组织机构'])

返回集合中出现次数等于1的元素

WITH apoc.coll.frequencies(['PREPCODE','PRE公司中文名称','PRE人物','PRE网址','PRE地理位置','PRE电子邮箱','PRE电话','PRE传真','PRE中文简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE公司注册号','PRE地税登记号码','PRE交易代码','PRE其它关联组织机构','PRE纳税人识别号','PRE营业执照号码','PRE国税登记号码','PRE中文拼音简称','PREPCODE','PRE公司中文名称','PRE中文简称','PRE中文拼音简称','PRE英文名称','PRE英文简称','PRE统一社会信用代码','PRE日期','PRE地理位置','PRE邮编','PRE电子邮箱','PRE网址','PRE人物','PRE电话','PRE传真','PRE上市公司代码','PRE组织机构代码','PRE营业执照号码','PRE国税登记号码','PRE地税登记号码','PRE公司注册号','PRE纳税人识别号','PRE交易代码','PRE其它关联组织机构']) AS countMapList
UNWIND countMapList AS countMap
WITH countMap
WHERE countMap.count=1 RETURN countMap

获取API数据

WITH apoc.convert.fromJsonMap(olab.http.post('http://api.data.cn','{"api_name":"wind_news","params":{"article_id":"113008559"},"token":"***"}')) AS result
WITH result.data.fields AS fields,result.data.items AS items,result.data AS data
WITH fields,items
UNWIND items AS item
RETURN fields,item

返回集合的并集

RETURN apoc.coll.union(['testlab','Test'],['Test'])

CASE条件语句

WITH [1,2,3,4,5] AS QList
UNWIND QList AS Q
WITH Q
WITH CASE
     WHEN Q=4 THEN 'Q4'
     WHEN Q=3 THEN 'Q3'
     WHEN Q=2 THEN 'Q2'
     WHEN Q=1 THEN 'Q1'
     ELSE 'Q'
     END AS name
RETURN name

字符串截取

WITH ['20140329000000','20140630000000','20140931000000','20141231000000'] AS timeList
UNWIND timeList AS time
RETURN time,TOINTEGER(SUBSTRING(time,4,4)) AS monthDate
WITH ['20140329000000','20140630000000','20140931000000','20141231000000'] AS timeList
UNWIND timeList AS time
WITH TOINTEGER(SUBSTRING(time,4,4)) AS monthDate
WITH CASE
     WHEN monthDate>101 AND monthDate<=331 THEN 'Q1'
     WHEN monthDate>330 AND monthDate<=630 THEN 'Q2'
     WHEN monthDate>630 AND monthDate<=930 THEN 'Q3'
     WHEN monthDate>931 AND monthDate<=1231 THEN 'Q4'
     ELSE 'quarter'
     END AS quarter,monthDate
RETURN quarter,monthDate

重构-合并节点

MATCH (n) WHERE ID(n) IN [25055336,29938186] WITH COLLECT(n) AS nodes
CALL apoc.refactor.mergeNodes(nodes) YIELD node RETURN node

使用正则提取标签

RETURN REDUCE(label='',char IN REDUCE(label=[],char IN apoc.text.regexGroups('证监会行业分类(2012版)', '[\u4e00-\u9fa5a-zA-Z0-9]') | label+char) | label+char)

过滤出有效标签

FILTER(label IN ['','.','label'] WHERE label<>'' AND label<>'.')

并行搜索

Cypher的执行在缺省情况下都是单线程的。而有时,我们需要并行执行查询,例如对一个名称列表,查询它们在图中的邻居。这里,可以用APOC的Cypher相关过程runParallel()。

  • 并行初始化并执行查询。
  • 缺省情况下,最大分区数/并行数为CPU内核数 x 100;
  • 最多批次数为10000。例如,如果图数据库被分配了4个内核,
  • 那么并行的最多进程数为400。
    CALL apoc.cypher.parallel(
    'MATCH (from:HORGShareHoldV002 {name:$name}) -[:持股]-> (to:HORGShareHoldV002) RETURN from.name AS fName,to.name AS tName',
    {name:['中鑫同洲融资担保有限公司','枝江市马家店供销合作社','麻城市银源棉业有限责任公司','麻城市新合商贸有限公司','麻城市大别山电子商务公共服务中心','王五常']},
    'name'
    )
    

除法四舍五入

// 有效位保留六位,六位后四舍五入
RETURN apoc.number.exact.div('27','28',6,'HALF_DOWN')
// 保留有效位
RETURN apoc.number.exact.div('1256324','8472136723',4)
RETURN apoc.number.exact.div(TOSTRING(7.5*100),TOSTRING(110),3,'HALF_DOWN')

执行导出数据脚本

cat export.cql | bin/cypher-shell -a bolt://10.20.13.200:7687 -u neo4j -p testlab%pro
# 后台执行导出脚本
nohup cat export.cql | bin/cypher-shell -a bolt://10.20.13.200:7687 -u neo4j -p testlab%pro >>logs/export.log 2>&1 &
echo 'MATCH (n) RETURN n LIMIT 10;' | /home/ongdb/ongdb-enterprise-3.5.22/bin/cypher-shell -a bolt+routing://localhost:7687 -u neo4j -p testlab%pro

[1]分析属性KEY

MATCH (n:Category) WITH KEYS(n) AS keys
// 收集所有KEYS
WITH COLLECT(keys) AS keysList
// 统计keys-list生成map-list
WITH apoc.coll.frequencies(keysList) AS frequenciesList
// 设置itemCount属性,计算出的keys大小
WITH REDUCE(list=[],map IN frequenciesList | list+apoc.map.setKey(map,'itemCount',SIZE(map.item))) AS itemCountList
// 排序输出-降序
//RETURN apoc.coll.sortMaps(itemCountList,'itemCount')
// 排序输出-升序
RETURN apoc.coll.sortMaps(itemCountList,'^itemCount')
  • OUTPUT
    [
    {
    "count": 10,
    "item": [
      "CLASS",
      "DATA_SOURCE",
      "ENG_NAME",
      "NAME",
      "PUBLISHER",
      "START_DATE",
      "UPDATE_DATE",
      "node_id"
    ],
    "itemCount": 8
    }
    ]
    

[2]分析属性KEY

MATCH (n:Category) WITH KEYS(n) AS keys
// 收集所有KEYS
WITH COLLECT(keys) AS keysList
// 合并到一个列表
WITH REDUCE(mergeList=[],list IN keysList | mergeList+list) AS mergeList
// 统计每个属性出现的频率
WITH apoc.coll.frequencies(mergeList) AS frequenciesList
// 抽取所有KEY
RETURN EXTRACT(map IN frequenciesList | map.item) AS keys
  • OUTPUT
    # 指定标签排重之后的所有属性KEYS
    ["CLASS", "DATA_SOURCE", "ENG_NAME", "NAME", "PUBLISHER", "START_DATE", "UPDATE_DATE", "node_id", "CITY", "COUNTY", "PROVINCE", "CODE", "LEVEL", "STATUS"]
    

[3]分析属性KEY

// What kind of nodes exist
// Sample some nodes, reporting on property and relationship counts per node.
MATCH (n) WHERE rand() <= 0.1
RETURN
DISTINCT labels(n),
count(*) AS SampleSize,
avg(size(keys(n))) as Avg_PropertyCount,
min(size(keys(n))) as Min_PropertyCount,
max(size(keys(n))) as Max_PropertyCount,
avg(size( (n)-[]-() ) ) as Avg_RelationshipCount,
min(size( (n)-[]-() ) ) as Min_RelationshipCount,
max(size( (n)-[]-() ) ) as Max_RelationshipCount
  • OUTPUT
    labels(n)	SampleSize	Avg_PropertyCount	Min_PropertyCount	Max_PropertyCount	Avg_RelationshipCount	Min_RelationshipCount	Max_RelationshipCount
    ["Person"]	121920	5.51455052493442	2	10	2.5753280839895725	0	5794
    ["Category"]	510	5.854901960784309	5	8	258.07450980392116	0	3768
    ["Keyword"]	69131	1.9999855347094488	1	2	23.31085909360472	1	34421
    ["Entity"]	905092	10.509107361461911	2	26	6.159585986838973	0	118336
    ["Event"]	163052	10.526936192135055	6	12	1.0078870544366236	0	603
    ["Product"]	117704	13.57018453068722	5	14	1.9037585808468893	1	2366
    ["Content"]	104774	5.602267738179318	4	6	16.447887834768103	0	87
    ["Security"]	33639	7.573471268468187	6	9	54.16005232022342	0	40377
    

导出全库数据到JSON文件

CALL apoc.export.json.all('all.json')

查询旧库的关系类型

CALL db.labels() YIELD label WITH COLLECT(label) AS labelList
UNWIND labelList AS labelK
UNWIND labelList AS labelG
WITH labelK,labelG
CALL apoc.cypher.doIt('OPTIONAL MATCH p=(from:`'+labelK+'`)-[r]->(to:`'+labelG+'`) RETURN r LIMIT 1',{}) YIELD value WITH value.r AS r
WITH DISTINCT LABELS(STARTNODE(r)) AS startNodeLabel,TYPE(r) AS relname,LABELS(ENDNODE(r)) AS endNodeLabel
WITH startNodeLabel[0]+'_'+relname+'_'+endNodeLabel[0]+'.file' AS fileName
RETURN fileName
# OUTPUT
"Category_INVOLVE_Product.file"
"Category_INVOLVE_Security.file"
"Person_SERVE_Entity.file"
"Person_ISSUE_Content.file"
"Person_SERVE_Security.file"
"Entity_SERVE_Person.file"

导出的全库数据写入新库

这种方式在处理大文件时必须切割成多个小文件处理

  • 导出数据
    CALL apoc.export.json.all('all.json')
    #  24G -rw-r--r--  1 root   root    24G Jan 22 05:44 all.json
    
  • 切割导出的all.json文件【4G为一个文件】
    split -b 4096m all.json
    
  • 节点数据样例
    {"type":"node","id":"215","labels":["PRE地理位置"],"properties":{"score":2.60445,"name":"北京市","hupdatetime":20200529113448}}
    
  • 关系数据样例
    {"id":"9259","type":"relationship","label":"NEXT","properties":{"hupdatetime":20200811135439},"start":{"id":"5829","labels":["Region","国家"]},"end":{"id":"9460","labels":["Region","省洲或直辖市"]}}
    
  • 建索引【用旧库的自增ID做为节点唯一判断】
    CREATE CONSTRAINT ON (n:DUMP_TEMP) ASSERT n.dump_id IS UNIQUE;
    
  • 写入节点
    CALL apoc.load.json('all.json') YIELD value WHERE value.type='node' WITH value AS row
    CALL apoc.cypher.doIt('MERGE (n:DUMP_TEMP {dump_id: {dump_id}}) SET n += {properties} WITH n CALL apoc.create.addLabels(n,{labels}) YIELD node RETURN node', {properties: row.properties, dump_id:'raw_'+row.id,labels:row.labels}) YIELD value RETURN COUNT(*) AS count
    
  • 写入关系
    CALL apoc.load.json('all.json') YIELD value WHERE value.type='relationship' WITH value AS row,value.label AS type
    CALL apoc.cypher.doIt('MATCH (from:DUMP_TEMP {dump_id: {f_dump_id}}),(to:DUMP_TEMP {dump_id: {t_dump_id}}) MERGE p=(from)-[r:`'+type+'`]->(to) SET r += {properties} RETURN p', {properties:apoc.map.merge({},row.properties),f_dump_id:'raw_'+row.start.id,t_dump_id:'raw_'+row.end.id}) YIELD value RETURN COUNT(*) AS count
    
  • 对切割的文件执行节点构建
    WITH ['xaa','xad','xab','xae','xac','xaf'] AS fileList
    UNWIND fileList AS file
    CALL apoc.load.json(file) YIELD value WHERE value.type='node' WITH value AS row
    CALL apoc.cypher.doIt('MERGE (n:DUMP_TEMP {dump_id: {dump_id}}) SET n += {properties} WITH n CALL apoc.create.addLabels(n,{labels}) YIELD node RETURN node', {properties: row.properties, dump_id:'raw_'+row.id,labels:row.labels}) YIELD value RETURN COUNT(*) AS count
    
  • 对切割的文件执行关系构建
    WITH ['xaa','xad','xab','xae','xac','xaf'] AS fileList
    UNWIND fileList AS file
    CALL apoc.load.json(file) YIELD value WHERE value.type='relationship' WITH value AS row,value.label AS type
    CALL apoc.cypher.doIt('MATCH (from:DUMP_TEMP {dump_id: {f_dump_id}}),(to:DUMP_TEMP {dump_id: {t_dump_id}}) MERGE p=(from)-[r:`'+type+'`]->(to) SET r += {properties} RETURN p', {properties:apoc.map.merge({},row.properties),f_dump_id:'raw_'+row.start.id,t_dump_id:'raw_'+row.end.id}) YIELD value RETURN COUNT(*) AS count
    

提取中文英文和数字并将字符串转为大写

RETURN TOLOWER(apoc.text.regreplace('1290 Avenue of the Americas, 11th Floor New York, New York 10104','[^a-zA-Z0-9\u4e00-\u9fa5]',''))

提取中文英文和数字并将字符串转为大写并执行繁体转为简体的操作

RETURN olab.string.toSimple(TOLOWER(apoc.text.regreplace('1290 Avenue of the Americas, 11th Floor New York, New York 10104','[^a-zA-Z0-9\u4e00-\u9fa5]','')))
WITH '1290 Avenue of the Americas, 11th Floor说话 New York, New York 10104' AS name
RETURN olab.string.toSimple(TOLOWER(apoc.text.regreplace(name,'[^a-zA-Z0-9\u4e00-\u9fa5]',''))) AS name

实控人网络数据导出【导出CSV】

CALL apoc.export.csv.query('MATCH (n:HORGShareHoldActualControl)-[r:实控]->(m:HORGShareHoldActualControl) WITH apoc.convert.fromJsonList(r.shareholding_detail) AS shareHoldList,n,m UNWIND shareHoldList AS shareHold RETURN n.name AS fromName,n.hcode AS fromHcode,m.name AS toName,m.hcode AS toHcode,shareHold.amount AS amount,shareHold.currency AS currency,shareHold.holdAmount AS holdAmount,shareHold.holderType AS holderType,shareHold.ratio AS ratio,shareHold.votingRights AS votingRights,shareHold.releaseDate AS releaseDate,shareHold.updateDate AS updateDate,shareHold.investDate AS investDate,shareHold.investType AS investType,shareHold.src AS src,shareHold.dataSource AS dataSource','HORGShareHoldActualControl.csv',{stream:true})