# Pump **Repository Path**: gouhongshen/pump ## Basic Information - **Project Name**: Pump - **Description**: 一个用C写的数据共享中间件 - **Primary Language**: C - **License**: LGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-11-06 - **Last Updated**: 2023-04-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README **这是一个基于发布订阅模式的分布式消息中间件(类似ROS2)**
### 目录结构 ![输入图片说明](figurestree.png)
### 系统架构 ![输入图片说明](figuressystem-architecture.png)

**消息发布**
所以发布者共用一个发布者助手,将发布动作作为一个具有截至时间的事件,在循环中,发布助手每次查询最快到期的事件的剩余时间,然后睡眠等待同样长的时间,然后醒来发布。发布助手将每个发布者的消息写入该发布者的消息队列中,然后唤醒writer,writer遍历所有发布者队列,只要队列中有数据,就将其注册到事件队列中。

消息被放入事件队列,eventProcessor被唤醒。它从事件队列中提取一个消息,查找其所属的topic,然后将该消息发送给该topic下的所有订阅者:
1)若订阅者在本地,直接将消息写入订阅者的消息队列中
2)若订阅者在网络中,将该消息注册到网络消息队列中,同时netProcessor被唤醒,根据网络订阅者套接字信息,然后发送。

**消息订阅**
所有订阅者共用一个reader,reader一一定频率扫描每个订阅者的消息队列,发现有消息,就调用订阅者注册的消息回调函数处理该消息。
netProcessor有两个线程,第一个负责监听套接字(处理连接、数据发送等),第二个负责将网络消息发送出去。当收到网络数据后,netProcessor将其注册到事件队列中。
**服务发现**
不同机器的节点启动后,会向该网络中广播一条Detect消息,收到该消息的其他节点会主动与该节点建立TCP连接,随后一直保持,消息发送,同步信息等通过这条连接。
**网络订阅**
当某节点本地创建、删除Agent、发布者、订阅者时,该节点会将这些信息发送给已发现的其他节点,这样,一个节点中的订阅者就同时存在网络订阅者和本地订阅者。<
**消息持久化**
eventProcessor在发送消息时,会将本地消息写日志。
#### 日志系统实现 ![输入图片说明](figureslog-architecture.png) 生成的数据分为两个流:一个实时流(作为上一小节架构的输入);一个历史缓存流,采用日志存储。
无论时日志段还是按天存储的数据文本,为了加快查找(减少IO),应该给它们维护一个索引,指出需要的数据的偏移量。但如果数据太多,索引就会变得很大。如果一组记录一个索引,就可以大大减少索引数量,但需要记录有序。所以在内存中维护一个红黑树,将数据暂时存储在该树中。待数据达到预设的阈值后就将他们写入一个日志段。同时,可以将多个日志段合并,将相同的topic按天存储,并以小时为组维护索引。

为了防止程序崩溃,内存(红黑树)数据消失,可以每过30秒记录一次红黑树的快照,以此作为恢复的蓝本。
当历史查询来临时,查询的数据可能有以下几种情况:1)红黑树中;2)日志段中;3)合并的按天存储的文本中;4)不存在。 因此查询就可能变得很慢,可以借助布隆过滤器,判断数据是否存在。
#### 网络服务实现