# kg-etl **Repository Path**: delphi_xk/ps-kg-etl ## Basic Information - **Project Name**: kg-etl - **Description**: 知识图谱,ETL,Neo4J,OrientDB - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 16 - **Forks**: 11 - **Created**: 2019-07-05 - **Last Updated**: 2025-05-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ps-kg-etl 知识图谱数据处理 ## Neo4J测试环境 1. 测试服务器机配置:32G内存,250G硬盘,8 cores;测试客户端机器i5-6400,内存8G 2. 软件版本:CentOS 7.3,Neo4J 3.5.5 Community,OrientDB 3.0.22, Docker 1.13 3. 常见检索语句参考[图数据库基本操作] ## 主要工作 1. 编写**Sqoop**脚本,集成MySQL与HBase数据库(离线、批量)。 2. 编写**JAVA**数据驱动程序,提取画像数据库内节点(Node)及关系(Relationship)数据,导入Neo4J。 3. 编写**JAVA**ETL程序,构建图模型(Neo4J、OrientDB)。 ## Benchmark *以下测试均在单节点下性能测试,暂不包括集群部署下相关性能测试:* ### 1.单次插入&大规模插入(Batch模式)性能对比测试 |测试场景|单次插入耗时|批量模式耗时| |----|----|----| |500节点插入|26s|**3s**| |1000节点插入|50s|**4s**| |2000节点插入|101s|**6s**| |28758节点插入|20mins以上|**1min**| |500关系插入|27s|**3s**| |1000关系插入|51s|**3s**| |2000关系插入|101s|**5s**| |27142关系插入|20mins以上|**35s**| * 以上测试可见,单次插入耗时线性增长,算法效率为O(n),批量模式对数增长,算法效率为O(log_n)。 * 受限于程序运行机器8G内存,设置批大小为2000,未开启并行计算,实际中并行处理加更大的批设置能进一步提升处理效率。 ### 2.查询性能测试:随机节点全部相邻节点查询、关系查询 *由于是随机挑选的节点,因此返回相邻节点数每次运行结果不同,这里取均值。* *以下查询性能测试的场景中,查询耗时指查询语句命中不同数量的记录条数返回时间* #### 相邻节点查询 |测试场景|返回条数|查询耗时| |----|----|----| |500节点|2000+|1s| |1000节点|1万+|1.5s| |2000节点|2万+|2.5s| |10000节点|4万+|3s| |20000节点|5万+|5s| #### 关系查询 |测试场景|查询耗时| |----|----| |500关系|0.1s| |1000关系|0.2s| |5000关系|0.3s| |10000关系|1s| |20000关系|2s| ### 3 算法测试 #### 3.1 最短路径(Shortest Path) 最短路径算法除了应用于地理上的寻路,还可以用于定义网络中的分离度(Degrees of Separation),常用算法Dijkstra。 |测试场景|查询耗时| |----|----| |10对节点|1.7s| |50对节点|8.5s| |100对节点|18s| |150对节点|30s| 算法查询没有批量方式,只能逐条记录查询,每次查询耗时在0.2s左右 例:两个人之间最短关系发现 ```cypher MATCH (p1), (p2) WHERE id(p1) = 173  AND id(p2) = 178 CALL algo.shortestPath.stream(p1, p2)  YIELD nodeId, cost RETURN algo.asNode(nodeId) as node, cost ``` #### 3.2 相似度计算(Similarity) Neo4J支持如欧式距离(Euclidean),余弦相似度(Cosine),皮尔森相似度(Pearson)等计算方式。 这里以Jaccard相似度计算为例,通过节点的相邻节点组成的集合,计算两个集合的交集与并集之商。 |测试场景|查询耗时| |----|----| |100对节点|1.5s| |200对节点|3.5s| |500对节点|6s| |1000对节点|15s| 相似度计算每次耗时在0.01s左右 #### 3.3 社区发现(Community Detection) * **The Louvain algorithm** 以企业间投资关系组成的图为例(2.8万节点,2.7万关系),调用louvain算法: ```cypher CALL algo.louvain.stream('Enterprise', 'INVEST', {includeIntermediateCommunities: true} ) YIELD nodeId, community RETURN algo.asNode(nodeId).enterpriseName AS name, community ORDER BY community ``` ![388ae6489cf99df944b38d5a9d78c35e.jpeg](en-resource://database/3373:1) 总耗时1.5s,划分社区1780个左右(最终收敛结果不一定相同)。 * **Label Propogation Algorithm**(标签传播算法) 同样以企业间投资关系组成的图为例(2.8万节点,2.7万关系),调用LP算法: ```cypher CALL algo.labelPropagation.stream('Enterprise', 'INVEST', {direction: 'BOTH'} )  YIELD nodeId, label RETURN algo.asNode(nodeId).enterpriseName AS name, label ORDER BY label ``` ![a2f24835a3c1eae7bfab7ab3b1440664.jpeg](en-resource://database/3375:1) 总耗时1s,划分社区2300个左右(最终收敛结果不一定相同)。 * **Connected Components Algorithm**(连通分量算法) 该算法验证(无向)图的连通性,结果为图的连通分量(Connected Component): ```cypher CALL algo.unionFind.stream('Enterprise', 'INVEST', {}) YIELD nodeId, setId RETURN algo.asNode(nodeId).enterpriseName AS name, setId ORDER BY label CALL algo.unionFind( 'MATCH (p) WHERE p:Judgement or p:Law RETURN id(p) AS id', 'MATCH (p1) -[:BASED_ON|:BELONGS]-> (p2) RETURN id(p1) AS source, id(p2) AS target', {graph: 'cypher'} ) YIELD nodes, communityCount, p1, p5, p10, p50, p90, p100 ``` ![c4896b3aafe8aff43ad644640f9b7c23.jpeg](en-resource://database/3377:1) 总耗时1s,划分1742个连通分量,该算法稳定收敛,在图结构不变情况下,连通分量结果不变。 #### 3.4 中心节点(Centrality) * **Page Rank Algorithm**,PageRank算法是Google搜索引擎的排序算法,除了应用于信息检索领域,还可以用于: * 社交平台的用户推荐 * 地图中的流量预测 * 异常、欺诈检测 测试脚本耗时1s左右。 ```cypher CALL algo.pageRank.stream('Enterprise', 'INVEST', {direction: 'BOTH'}) YIELD nodeId, score RETURN algo.asNode(nodeId).enterpriseName, score ORDER BY score DESC ``` > [*PageRank Beyond Web*](https://arxiv.org/pdf/1407.5107.pdf) * **Degree Centrality algorithm**,测量节点进出关系数量的算法,代表着节点在图中的重要程度。 该算法也可以对图的整体结构进行分析,如计算图的最小度(degree),最大度和平均度。测试脚本计算耗时1s左右。 ```cypher CALL algo.degree.stream("Enterprise", "INVEST") YIELD nodeId, score RETURN algo.asNode(nodeId).enterpriseName, score ORDER BY score DESC ``` * **Harmonic Centrality algorithm** #### 3.5 关联预测(Link Prediction) * **Common Neighbors algorithm** CommonNeighbors算法基于两个节点共有邻居节点的集合来判断两个节点是否有可能产生新联系,类似于协同过滤的思想。测试脚本运行耗时20s左右。 ```cypher MATCH (e1:Enterprise{enterpriseName:'苏宁易购集团股份有限公司'} ), (e2:Enterprise) RETURN e2.enterpriseName, algo.linkprediction.commonNeighbors(e1, e2) AS score ORDER BY score DESC LIMIT 100 ``` * **Adamic Adar algorithm** AdamicAdar算法根据两个节点间的共有邻居节点来判断两个节点的接近程度。测试脚本耗时15s。 ```cypher MATCH (e1:Enterprise{enterpriseName:'苏宁易购集团股份有限公司'} ), (e2:Enterprise) RETURN e2.enterpriseName, algo.linkprediction.adamicAdar(e1, e2) AS score ORDER BY score DESC LIMIT 100 ``` * **Total Neighbors algorithm** TotalNeighbors算法根据两个节点邻居节点的并集大小来判断节点的接近程度。测试脚本耗时1s ```cypher MATCH (e1:Enterprise{enterpriseName:'苏宁易购集团股份有限公司'} ), (e2:Enterprise) RETURN e2.enterpriseName, algo.linkprediction.totalNeighbors(e1, e2) AS score ORDER BY score DESC LIMIT 100 ``` ### 4.运行时内存、CPU占用测试,硬盘占用测试 * 节点(Node)、关系(Relationship)都是定长格式的byte结构,一个Node占用15B,Relationship占用34B,Property占用41B。 * 测试数据集内存储13万个节点占用1.8MB,13万关系占用4.5MB,63万条属性占用24.7MB,字符串存储占用76MB,总共占用120MB。 * neo4j主要使用页内存(page cache)和堆内存(heap),其中,页内存用于缓存数据和索引,堆内存用于执行查询。 * neo4j需要尽量配置足够的内存用于缓存尽量多的数据和执行结果,以优化查询效率。 * 根据[官方文档](https://neo4j.com/docs/operations-manual/current/tools/neo4j-admin-memrec/)内存配置建议,**以一台32G内存机器为例**,配置最大堆内存为12G,页内存为12G,预留8G给系统后,neo4j单机最大容量为**1千万节点、1千万关系及6千万属性**。 * 在执行查询过程中,前期会有较高CPU占用率,后期会根据查询模板优化计算过程,优化后CPU占用不到峰值的一半。 > [*Benchmarking graph databases on the problem of community detection*](http://mklab.iti.gr/files/beis_adbis2014_corrected.pdf) * 导入31w节点,114w关系,91w属性,耗时5min,占用内存6.7G,存储850M ### 5.图与关系型数据库对比测试 * [官方介绍](https://neo4j.com/docs/operations-manual/current/introduction/#editions)社区版图的最大规模为340亿节点及关系,算法中对于巨大图(huge)的定义规模为20亿以上节点及关系。 * 本测试使用随机生成的数据集,生成图数据库规模为2千万节点,2千万关系的图,关系型数据库中为2张表,每张表中有2千万条记录。 * 图数据库为Neo4J 3.5社区版,关系型数据库为MySQL 5.7。 * 测试服务器为4 cpu,32G内存。 #### 5.0 导入数据 准备导入数据header文件,为node_id创建索引 ``` node_header.csv node_id:ID(Node),name:string rel_header.csv :START_ID(Node),:END_ID(Node) ``` ```bash bin/neo4j-admin import --database=graph3.db --id-type=STRING --nodes:Node="import/node_header.csv,import/nodes.csv" --relationships:REL="import/rel_header.csv,import/rels.csv" ``` #### 5.1 随机访问 **测试随机读取性能,查询部分数据** * 图,耗时**5 ~ 50ms** ```cql MATCH (n:Node) RETURN n LIMIT 50; MATCH (n1) -[r:REL]- (n2) RETURN n1,r,n2 LIMIT 50; ``` * 关系型,耗时**10 ~ 20ms** ```sql SELECT * FROM node LIMIT 50; ``` #### 5.2. 节点检索 **根据条件检索数据** * 图,耗时**10 ~ 100ms** ```cql MATCH (n:Node) WHERE n.node_id = '12345' return n ``` * 关系型,耗时**10ms** ```sql SELECT * FROM node WHERE node_id = '12345'; SELECT * FROM node_rel where start_id = '10000'; ``` #### 5.3. 直接关系检索 **根据条件,检索节点及其关系节点** * 图,耗时**10 ~ 96ms** ```cql MATCH (n1:Node) -- (n2:Node) where n1.node_id = '12345' return n1, n2; ``` * 关系型,耗时**20ms** ```sql select n.node_id, rel.end_id from node n join node_rel rel on n.node_id = rel.start_id where n.node_id = '10050' ``` #### 5.4. 多层、间接关系检索 **根据条件,检索节点的多层关系节点** * 图,耗时**20ms** ```cql MATCH (n1:Node) --> (n2:Node) --> (n3:Node) --> (n4:Node) WHERE n1.node_id = '10050' RETURN n1, n2, n3 ,n4; ``` * 关系型,耗时**20ms** ```sql select n.*, rel.* , rel2.*, rel3.* from node n join node_rel rel on n.node_id = rel.start_id join node_rel rel2 on rel.end_id = rel2.start_id join node_rel rel3 on rel2.end_id = rel3.start_id where n.node_id = '10050'; ``` #### 5.5 相邻节点统计 **相邻节点个数统计** * 图,耗时**20ms** ```cql MATCH (n1:Node) RETURN n1, size( (n1) -[:REL]-> ()) LIMIT 20 ``` * 关系型,耗时**20ms** ```sql select n.node_id, count(1) as count_n from node n join node_rel rel on n.node_id = rel.start_id group by n.node_id limit 20 ``` **多层关系的相邻节点个数统计** * 图,耗时**20ms** ```cql MATCH (n1:Node) RETURN n1, size( (n1) -[:REL*4]-> ()) LIMIT 20 ``` * 关系型,耗时**NA** ```sql select n.node_id, count(1) as count_n from node n join node_rel rel on n.node_id = rel.start_id join node_rel rel2 on rel.end_id = rel2.start_id join node_rel rel3 on rel2.end_id = rel3.start_id group by n.node_id limit 20 ``` #### 5.6 结论 * Neo4J图数据库单节点能够支持千万级数据的查询操作,如随机访问、条件检索等(1s内响应)。 * 图数据库与关系型数据库在常规的查询操作上相比,效率没有明显优势。 * 关系型数据库在对多层相邻节点的操作需要多次JOIN,统计其多层相邻节点时无法得到结果(响应时间大于10min);而图数据库可以在1秒内通过关系的遍历得到结果。 * 图由于索引的缓存加载机制,相同查询有一定波动。 ### 6 一些优化 **使用索引是优化查询的关键** * 不使用索引的排序,需要27s;使用索引的情况,仅50ms ```cql CALL apoc.periodic.iterate( "MATCH (n:Node) RETURN n", "SET n.rel1_count = size( (n) -[:REL]-> ())", {batchSize:10000, parallel:true} ) 264s CALL apoc.periodic.iterate( "MATCH (n:Node) RETURN n", "SET n.rel3_count = size( (n) -[:REL*3]-> ())", {batchSize:10000, parallel:true} ) 797s CREATE INDEX ON :Node(rel1_count) MATCH (n:Node) RETURN n ORDER BY n.rel1_count DESC LIMIT 10 MATCH (n:Node) WHERE n.rel1_count > 0 RETURN n ORDER BY n.rel1_count DESC LIMIT 10 ``` * ~~先匹配起始节点,然后匹配关系模式~~ neo4j 2.1版本,新版本已优化 ```cql MATCH (n:User { login: 'nash99' }) WITH n MATCH (n)-[:KNOWS]->(friend)-[:BUYS]->(c:Car) RETURN friend ``` ### 7.其他测试语句 ``` MATCH (n:Node) RETURN n limit 20 1ms MATCH ()-[r:REL]-() return r limit 20 2ms MATCH (n:Node) WHERE n.node_id = '12345' return n 15ms match (n1:Node)--(n2:Node) where n1.node_id = '123006' return n1, n2; 10ms match (n1:Node)--(n2)--(n3)--(n4:Node) where n1.node_id = '10010000' return n1, n2, n3, n4; 20ms CALL algo.triangleCount('Node', 'REL', {concurrency:4, write:false,clusteringCoefficientProperty:'coefficient'}) YIELD loadMillis, computeMillis, writeMillis, nodeCount, triangleCount, averageClusteringCoefficient; +-----------------------------------------------------------------------------------------------------+ | loadMillis | computeMillis | writeMillis | nodeCount | triangleCount | averageClusteringCoefficient | +-----------------------------------------------------------------------------------------------------+ | 23676 | 13329 | 379 | 20000000 | 2 | 1.05E-7 | +-----------------------------------------------------------------------------------------------------+ 37s CALL algo.triangleCount( 'MATCH (p) WHERE p:Judgement or p:Law RETURN id(p) AS id', 'MATCH (p1) -[:BASED_ON|:BELONGS]- (p2) RETURN id(p1) AS source, id(p2) AS target', {concurrency:4, write:false, graph:'cypher', clusteringCoefficientProperty:'coefficient'}) YIELD loadMillis, computeMillis, writeMillis, nodeCount, triangleCount, averageClusteringCoefficient +-----------------------------------------------------------------------------------------------------+ | loadMillis | computeMillis | writeMillis | nodeCount | triangleCount | averageClusteringCoefficient | +-----------------------------------------------------------------------------------------------------+ | 12538 | 36827 | 27 | 520654 | 0 | 0.0 | +-----------------------------------------------------------------------------------------------------+ 49s ``` --- ## Lucene相关 ### Lucene评分机制的概念模型 * Lucene的默认评分机制是基于VSM(Vector Space Model)的TFIDF算法 * Lucene的评分是计算在向量空间中文档与查询之间的余弦相似度(Cosine Similarity) * 原始的cosine-similarity计算方法: \\[ \text{cos-sim}(q,d) = \frac{V(q) \cdot V(d)}{|V(q)| |V(d)|} \\] * \\(V(q) \cdot V(d)\\)是向量的内积,\\( |V(q)| |V(d)| \\) 是向量的欧几里得范数 ### Lucene的一些优化,比如: - 规范化(Normalizing)V(d)成单位向量会去掉全部的文档长度信息。 `doc-len-norm(d)`会比单位向量更长。 - 建立索引时,用户可以使用参数` doc-boost(d)`指定某些更重要的文档。 - Lucene是基于域的(field based),也可以根据不同域来指定加权(field boost)。 - Lucene可以指定查询的加权,设置参数`query-boost(q)`。 ### Lucene的概念分数计算公式: \\[ score(q,d) = \text{query-boost}(q) \cdot \frac{V(q) \cdot V(d)}{|V(q)|} \cdot \text{doc-len-norm}(d) \cdot \text{doc-boost}(d) \\] lucene在实际实现中为了效率上的优化,提前进行了一些计算: - Query-boost是在查询开始之前就已知的。 - Query的范数`|V(q)|`也是提前计算的,分数都需要与其相乘,因此是不会影响检索排序结果的。为了在相同文档的不同查询的分数直接进行比较,因此乘以查询向量的范数是有必要的。 ### Lucene的实际分数计算公式: \\[ score(q,d) = coord(q,d) \cdot queryNorm(q) \cdot \sum (tf(\text{t in d}) \cdot idf(t)^2 \cdot t.getBoost() \cdot norm(t,d)) \\] 1. *tf(t in d)*指的是词项的频率, \\[ \text{tf(t in d)} = freq^\frac{1}{2} \\] 2. idf(t)是逆文档频率, \\[ idf(t) = 1 + log( \frac{numDocs}{docFreq+1} ) \\] 3. coord(q,d)是查询与文档的相关系数,如`query = a AND b AND c`,但文档中只出现a词项,则相关系数为1/3。 4. queryNorm(q)计算: \\[ queryNorm(q) = queryNorm(sumOfSqueryWeights) = \frac{1}{sumOfSquaredWeight^\frac{1}{2}} \\] 5. sumOfSquaredWeights: \\[ sumOfSquaredWeights = q.getBoost()^2 \cdot \sum_{\text{t in q}}(idf(t) \cdot t.getBoost)^2 \\] 6. `norm(t,d)`需要考虑域的加权和长度范数,词项短的域会贡献更高的分数。当一篇文档被索引时,这些参数都会被计算好。查询时已不能修改这些值,如需修改,考虑使用其他*Similarity*计算方法。 \\[ norm(t,d) = lengthNorm \cdot \prod_\text{f in d as t} f.boost() \\]