# TuGraphAnalyticsReproduction **Repository Path**: shbone/TuGraphAnalyticsReproduction ## Basic Information - **Project Name**: TuGraphAnalyticsReproduction - **Description**: 队伍名:FourthDimenison 队伍ID:316696 基于TuGraph Analytics的⾼性能图模式匹配算法设计复现代码仓库 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2023-11-29 - **Last Updated**: 2024-08-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # TuGraphAnalyticsCompetition 比赛队伍名:FourthDimension 比赛队伍ID:316696 > [官网地址](https://www.datafountain.cn/competitions/975) ## 说明 ### 程序入参 1. 数据集路径,eg: `D:\Development\GeaFlowCompetition\sf1\snapshot`,与原数据集保持一致 2. 输出文件路径,eg: `D:\Development\GeaFlowCompetition\output`,结果文件生成在此目录下,如 `result3.csv` ### 作品提交 1. 打包 `mvn clean package -DskipTests` 2. 压缩启动脚本和 JAR 包 `zip Contest.zip run.sh TuGraphAnalyticsCompetition-1.0-SNAPSHOT-jar-with-dependencies.jar` 3. 提交 Zip 文件到`https://www.datafountain.cn/competitions/975/submits` ## 题目 ### Case1:个人贷款统计 ![Case1 示意图](imgs/case1.png) #### 目标 匹配满足以下条件的所有person: 1. 找到person拥有的账户account(edge1:own)。 2. 账户account从其他账户other转入资金(edge2:transfer)。 3. 账户other从贷款loan中存⼊资金(edge3:deposit),至少⼀条。 输出: - id:满足目标条件的persion.id。 - value:贷款金额(loan.loanAmount)的总和,单位(亿),小数点后保留2位有效数字。 #### 思路 1. 构图 按列需要读取顶点和边的CSV文件,共3个顶点文件,3个边文件 1.1 顶点 > 顶点值类型为 Double, > `Loan` 类型顶点存储 `loanAmount`,`Person` 类型顶点存储答案,`Account` 类型顶点没用到置为 `null`。 | 文件名 | 列 | 顶点id | 顶点值 | |:--------------------:|:-----------------:|:---------:|:----------:| | Account.csv | accountId | accountId | null | | Loan.csv | loanId,loanAmount | -loanId | loanAmount | | Person.csv | personId | personId | 0.0 | 1.2 边 | 文件名 | 列 | 起点Id | 终点Id | 边的值 | 边的方向 | |:--------------------------:|:-----------------------:|:---------:|:---------:|:------:|:----:| | LoanDepositAccount.csv | loanId,accountId,amount | -loanId | accountId | amount | OUT | | PersonOwnAccount.csv | personId,accountId | accountId | personId | null | IN | | AccountTransferAccount.csv | fromId,toId,amount | fromId | toId | amount | OUT | > `loanId`和`accountId`有重复 id 值,为方便区分,loanId 值取反 2. 匹配 以点为中心,经过4轮消息的传递,消息体为`loanId`,`loanAmount` 组成的元组`Tuple` - 第1轮:将 `Loan` 的 `loanAmount` 通过edge3(`loanDepositAccount`)将消息体`Tuple.of(vertexId, vertexValue)` 传递给 `other` (`Account`)。通过 `vertexValue != null && vertexValue > 0` 来判断此节点是 `Loan` 类型的节点,收到消息的 `Account` 节点会在下一轮保持激活状态。 - 第2轮:收到消息的 `other` (`Account`) 顶点通过 `edge2`(`accountTransferAccount`) 边将消息传递给 `account` (`Account`) 顶点 - 第3轮:`Account`收到的消息通过edge1(`PersonOwnAccount`)边,反向转发给`Person`顶点 - 第4轮:`Person`顶点,将收到的消息对`loanId`进行去重,对应的`loanAmount`进行求和计算 3. 输出 要求:以`Person`顶点`personId` 为键,`loanAmount`为值,对`personId`进行排序后输出 1. 图计算完成之后获取所有顶点,在 `filter` 算子中通过 `record.getId() > 0 && record.getValue() != null && record.getValue() > 0` 条件过滤顶点 2. 通过 `map` 算子映射答案 3. 通过 `sink` 算子将答案写入文件 注:步骤2、3下同,不再赘述 ### Case2:交易闭环搜索 ![Case2 示意图](imgs/case2.png) #### 目标 匹配满⾜以下条件的所有账户 src: 1. src 向账户 dst 发起过转账(edge1:transfer)。 2. dst 向另⼀个账户 other 发起过转账(edge2:transfer)。 3. other 向 src 发起过转账(edge3:transfer)。 输出 - id:满足目标条件的 src.id。 - value:交易闭环(序列)中的数量。 #### 思路 1. 构图 按列需要读取顶点和边的csv文件,共1个顶点文件,1个边文件 顶点 | 文件名 | 列 | 顶点Id | 顶点值 | |:-----------:|:---------:|:---------:|:---:| | Account.csv | accountId | accountId | 0.0 | 边 | 文件名 | 列 | 起点Id | 终点Id | 边的值 | 边的方向 | |:--------------------------:|:-----------------------:|-----------|:---------:|:------:|:----:| | AccountTransferAccount.csv | fromId,toId,amount | fromId | toId | amount | OUT | 2. 匹配 与case3 共用一个图,以点为中心,经过3轮消息的传递,消息体为Long类型的`AccountId`。 - 第1轮:将当前节点`Account`的Id通过edge1(accountTransferAccount)出边将Long类型的`AccountId`传递给`dst`(`Account`) - 第2轮:收到消息的`dst`(`Account`)顶点,通过edge2(`accountTransferAccount`)边将消息传递给`other1`(`Account`)顶点,即每个顶点将收到的所有消息转发给出边的所有顶点。 - 第3轮:收到消息的`other`(`Account`)顶点,针对出边edge3(`accountTransferAccount`)边,使用HashMap结构[`targetId`(`AccountId`)为键,`edge3`(`accountTransferAccount`)数量为值]存储相关信息(存储当前节点出边对应的id和数量),遍历所有收到的消息,判断消息中的顶点 `id` 在出边对应的 `map` 中是否出现,如果出现,则当前节点对应环的数量增加 `map[id]` 个。 3. 输出 要求:以`Account`顶点`accountId` 为键,累加的个数为值,对`accountId`进行排序后输出 1. 图计算完成之后获取所有顶点,在 `filter` 算子中通过 `record -> record.getValue() > 0` 条件过滤顶点 #### Case2 方法二 需要建双边,两轮迭代。 第一轮迭代所有顶点知道自己的入边和出边,并且向自己的所有出边发送自己的所有入边id。这样第二轮迭代首先获取所有出边,用一个哈希表记录出边的id和对应的数量,然后遍历所有消息,哈希表累计计数即可。 详情见代码,不再赘述。 #### Case2 方法三 在二的基础上减少一轮遍历,将消息累计之后以 `Map` 的格式发送消息。 详情见代码,不再赘述。 ### Case3:资金快进快出 ![Case3 示意图](imgs/case3.png) #### 目标 匹配满⾜以下条件的所有账户mid: 1. 找到所有向mid发起的转账边(edge1:transfer),至少⼀条。 2. 找到所有mid发起的转账边(edge2:transfer),至少⼀条。 输出 - id:满⾜⽬标条件的mid.id。 - value:交易流⼊流出⽐,SUM(edge1.amount) / SUM(edge2.amount),⼩数点后保留2位有效数 字 #### 思路 1. 构图 按列需要读取顶点和边的CSV文件,共1个顶点文件,1个边文件(与case2的图相同) 顶点 | 文件名 | 列 | 顶点Id | 顶点值 | |:-----------:|:---------:|:---------:|:---:| | Account.csv | accountId | accountId | 0.0 | 边 | 文件名 | 列 | 起点Id | 终点Id | 边的值 | 边的方向 | |:--------------------------:|:-----------------------:|-----------|:---------:|:------:|:----:| | AccountTransferAccount.csv | fromId,toId,amount | fromId | toId | amount | OUT | 2. 匹配 与 Case2 共用一个图,以点为中心,经过2轮消息的传递,消息体为Double类型的`amount` - 第1轮:对于每个顶点,获取其所有出边(只可能是`accountTransferAccount`类型的),如果出边不为空(满足至少一条的条件),则遍历所有出边,获取边上的值并累加到`outSum`变量中,同时将出边上的值发送给所有出边的顶点(这样下一次迭代时每个顶点就可以变相接收所有入边的消息值)。遍历完之后将当前节点的值设置为 `-outSum` (设置为负数便于过滤判断)。 - 第2轮:对于每个顶点,遍历接收到的所有消息并将值累加到 `inSum` 变量中,获取顶点存储的值 `outSum`,如果顶点存储的值小于零,则更新当前顶点的值为 `-inSum / outSum`。 核心 **`Account`顶点的值通过0,正数,负数三个范围区别`Account`的4种状态** | Account的值 | 出边 | 入边 | |:---------:|:---:|:---:| | 负数 | 存在 | 不存在 | | 0 | 不存在 | 不存在 | | 0 | 不存在 | 存在 | | 正数 | 存在 | 存在 | 3. 输出 要求:以`Account`顶点`accountId` 为键,`Account`的值交易流入与流出的比值为值,对`accountId`进行排序后输出 1. 图计算完成之后获取所有顶点,在 `filter` 算子中通过 `record -> record.getValue() > 0` 条件过滤掉值为0和负数的`Account`,获得满足要求的`Account` 除了上述方法,Case3也有其他的一些方法,代码见 `single` 包下。 #### Case3 方法二 之前的方法一使用了两次迭代,因为问题只涉及到当前一个顶点,能够只通过一次迭代解决问题呢?答案是可以的,但是需要建图的时候同时建 `OUT` 和 `IN` 边。这样在计算的时候对于当前节点同时处理入边和出边即可。详细实现见代码,不再赘述。 ### Case4:担保金额汇总 ![Case4 示意图](imgs/case4.png) #### 目标 匹配满⾜以下条件的所有⼈ p1: 1. 找到 p1 下游担保链上的所有人 p,至少 1 级,至多 3 级。 2. 找到 p 申请的贷款信息,汇总贷款金额。 #### 思路 1. 构图 按列需要读取顶点和边的csv文件,共1个顶点文件,1个边文件 顶点 | 文件名 | 列 | 顶点Id | 顶点值 | |:--------------------:|:-----------------:|:---------:|:----------:| | Loan.csv | loanId,loanAmount | loanId | Tuple.of(loanAmount, 0.0) | | Person.csv | personId | personId | Tuple.of(0.0, 0.0) | 边 | 文件名 | 列 | 头Id | 尾Id | 边的值 | 边的方向 | |:--------------------------:|:-----------------------:|-----------|:---------:|:------:|:----:| | PersonApplyLoan.csv | personId,loanId,0 | personId | loanId | 0 | IN | | PersonGuaranteePerson.csv | fromId,toId,1 | fromId | toId | 1 | OUT | > 消息设计 > 类型: `double` > 含义:同时表示两种含义,即 `id` 或者 `appliedLoanSum`。如果 `Person` 收到一个负数,表示是一个id,表示当前节点要给对应id发送自己缓存的值,否则表示是`loanSum` 的值,直接累加到答案中即可。 > Person类型顶点存储的值 > 类型:Tuple.of(0.0, 0.0) > 其中 `f0` 表示每个人申请到的贷款总和,`f1` 用于记录答案,表示下游链上三度以内的人申请到的贷款总和。 > Loan类型顶点存储的值 > 类型:Tuple.of(0.0, 0.0) > 其中 `f0` 表示 `loanSum`,`f1` 未使用。 2. 匹配 > 注:此题数据较弱,不存在一个人的下游链上的一个人`loanSum`重复和环的情况 以点为中心,经过6轮消息的传递,消息体为Double类型 - 第1轮:所有的 `Loan` 顶点 (通过`vertexValue.f0 > 0` 判断) 通过 `apply` 边将自身的 `loanAmount` 发送给的`Person`顶点,同时把自身值的 `f0` 更新为`-loanAmount`,以便退出后续判断;所有的`Person`顶点如果存在出边,则给自己发送一个空消息,以处于激活状态(因为可能出现顶点自己没有申请贷款但是下游链上有申请贷款记录的情况) - 第2轮:收到消息的所有`Person`顶点 (P1),对发送给自己的`loanAmount`求和,更新为自身的顶点值 `vertexValue.f0 = loanAmountSum`,即每个`Person` 在自己这里缓存收到的贷款总和值,并且通过 `guarantee` 边发送给所有下游的 (P2),值为 `-vertexId` - 第3-5轮:获取当前节点的出边,遍历所有消息。如果消息的值不小于0,说明收到的消息是申请贷款的值,直接累计到答案中,即 `vertexValue.f1 += msg`;否则收到的是一个我要给它发送我自己缓存的申请贷款值的信号,因此给顶点 `id` 是 `(long) (-msg)` 发送我自己缓存的贷款值 `vertexValue.f0`。然后遍历所有出边,当消息值小于0并且是 `guarantee` 边时,继续给下游发送信号,这样下游收到信号之后就会给上游的 P1 发送自己的值了。 - 第6轮:累加P1收到的所有下游担保数据,并更新值 3. 输出 要求:以`Person`顶点`personId` 为键,累加的`loanAmount`为值,对`personId`进行排序后输出 1. 图计算完成之后获取所有顶点,在 `filter` 算子中通过 `record.getValue().f0 >= 0 && record.getValue().f1 > 0` 条件过滤`Person`,获得满足要求的`Person` #### Case4 方法二 顶点 | 文件名 | 列 | 顶点Id | 顶点值 | |:--------------------:|:-----------------:|:---------:|:----------:| | Loan.csv | loanId,loanAmount | loanId | loanAmount | | Person.csv | personId | personId | 0.0 | 边 | 文件名 | 列 | 头Id | 尾Id | 边的值 | 边的方向 | |:--------------------------:|:-----------------------:|-----------|:---------:|:------:|:----:| | PersonApplyLoan.csv | personId,loanId,0 | personId | loanId | 0 | IN | | PersonGuaranteePerson.csv | fromId,toId,1 | fromId | toId | 1 | OUT | 上面的方法复杂本质上是因为是从前往后的,这就导致了下游链上的节点必须收到上游起始节点的id,并且望上游起始节点发送自己缓存的贷款和。另一种方法是换一个方向,从后往前看,对于每个节点,只需要将自己缓存的贷款和往前发送至多三跳即可。 以点为中心,经过5轮消息的传递,消息体为Double类型 - 第1轮:所有的`Loan`顶点将自身的`loanAmount`发送给`apply`的`Person`顶点,同时把自身值更新为`-loanAmount`,以便退出后续迭代;所有的`Person`顶点如果存在入边,则给自己发送一个空消息,以便进入下一轮迭代 - 第2轮:收到消息的`Person`顶点,对发送给自己的`loanAmount`求和,并且更新韦自身的顶点值(P4),并且发送给所有的该顶点的担保人(P3) - 第3轮和第4轮:重复获取下游发送给自己的消息,统计并更新,value=value+sum,其中sum为下游发送给自己的消息值求和,value为自身的顶点值。至此,已更新完所有的P2、P3、P4 - 第5轮:对P1收到的所有下游担保数据进行去重,并更新值 代码见 `1127-merge234`分支,不再赘述 ### 优化点 1. 输入。读写CSV文件使用了 `Apache CommonCSV`,`univocity-parsers`,`FastCSV` 等库,`univocity-parsers`读取速度最快,并且支持将查询列下推,效果进一步提升。 2. 输出。输出分为两部分,排序+写入,因为发现数据的 `id` 都是 `long` 范围内的,因此最后将结果转换为 `` 的对象进行暂存,排序速度较快,写入时直接将每个对象转换成 `String` 使用原生Java的方式写入。 3. 并行度。Filter,Map,Union以及迭代等算子并行度设置。 4. 算法优化。主要是压缩消息体设计,比如 `map` 转 `tuple` 等,实际上不同算法在这个数据下感觉差别不大。 5. 将Case2和Case3放到一个图中进行计算,这样只需要建图一次,跑两次计算,但是需要组合与消息/节点值的取舍。 6. 将不同的计算任务添加到一个 `pipeline` 中。 ...