# xconsumer **Repository Path**: pipiking/xconsumer ## Basic Information - **Project Name**: xconsumer - **Description**: 基于zookeeper与go实现的分布式消费系统 - **Primary Language**: Go - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 4 - **Created**: 2022-10-12 - **Last Updated**: 2022-10-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # xconsumer 基于zookeeper与go实现的分布式系统 ## 基础功能 - 基于zk的集群 - leader选举 - 任务分发 - 事件监听 - http管理 ## 开发版本 - zookeeper 3.7.0 - go 1.16 ## 如何使用 基于zookeeper进行leader选举,需要安装zookeeper,推荐使用docker进行安装。 ```zookeeper安装 docker run --name zookeeper-name -p 2181:2181 -it -d zookeeper ``` 需要其他特性请查看[docker hub官方文档](https://hub.docker.com/_/zookeeper) ``` 生成运行文件 go build -o bin main.go 执行 cd bin ./main.go ``` > 如果你想测试多个客户端的选举情况和worker任务分配情况,只需要执行多个bin下面的编译文件即可。 ## 通过docker使用 **dockerfile方式** 自行安装好zookeeper,修改app.toml中的zookeeper的host ``` 构建镜像 docker build -t xconsumer:0.1 . 运行 docker run -it --name xconsumer --rm xconsumer:0.1 ``` docker-compose 修改app.toml中的zookeeper的host为"xconsumer-zookeeper" ``` 构建容器 docker-compose up ``` ## 功能开发 默认系统添加了两个测试任务,`test01`,`test02`,代码位于work目录下 我们以添加一个`test01`任务来举例 1. 修改app.toml配置文件,新增一个`[[task]]` ``` name = "test01" # 任务名称,我们就叫做test01 workName = "test01" # 任务执行的work,程序会回调此方法 workerNum = 2 # 任务数量,这里需要启动2个任务处理 ``` 2. 在work目录下新建文件夹test01,新建main ``` type Test01 struct { } // Run 非阻塞调用 // 程序执行完以后,又会重新调起 // 程序每隔10秒钟打印一句话 func (t *Test01) Run() { log.Info("i am test 01,10s一次") time.Sleep(time.Second * 10) } ``` 这一步不是必须的,只是为了让各个work更加分离开,建议这样做 3. 新建一个`test01.go`文件 方法名命名规范为Run+work名字,work名字为配置中的workName 方法必须是Work中的一个方法集,因为程序实现使用了反射 ``` // 必须是Work的方法 func (w *Work) RunTest01() { t01 := test01.Test01{} t01.Run() } ``` 到此为止,我们的一个任务就编写完了,可以重启项目查看运行效果了 ## 功能架构 > 图片放在github,加载可能会较慢,请耐心等待 整体xconsumer项目代码量非常少,大致流程分为以下几步: 1. 集群初始化(检查) 2. leader选举 3. leader集群任务分配 4. 分发任务 5. 开始工作 ![功能架构](https://raw.githubusercontent.com/y80x86ol/img/main/2021/20210718155802.png) ## 功能流程 ![功能流程](https://raw.githubusercontent.com/y80x86ol/img/main/2021/20210718145911.png) 注:只有leader节点操作,并且leader节点和其他follower节点一样,需要进行任务工作处理。 ## 节点通信 节点连接上zk后,会根据集群名字直接加入指定集群,节点和节点直接通过zk事件监听通信,不直接进行通信操作 ## 节点选举 zookeeper的节点是原子增加的,所以不会存在重复,相当于有一个原子锁。 xconsumer的选举算法就特别简单,选取最小值为leader,即先进先出原则。 如果leader掉线,会触发zk监听,其他node节点服务收到通知后重新判断自己是不是leader。 ![节点选举算法](https://raw.githubusercontent.com/y80x86ol/img/main/2021/20210718150332.png) ## worker计算 ![worker计算算法](https://raw.githubusercontent.com/y80x86ol/img/main/2021/20210718151829.png) 假设 - task1任务,需要3个worker - task2任务,需要2个worker - task3任务,需要1个worker worker的任务分配,采用的是节点轮询任务分配算法,这样比较好的保证每台服务器都能正常的分发到任务,不会造成任务拥挤或者任务缺失。 **存在问题** 但还是存在一个问题是"头部问题",因为leader节点是按照最小原子选取的,而轮询也是按照最小开始,这样leader节点服务器的工作任务就更重。 有两个可优化点 1. leader不参与worker工作 2. 任务计算从follower开始 ## 动态work分配 任务分配是动态计算的,当加入一个节点,或者关闭一个节点,任务由leader动态分配 ![动态work分配](https://raw.githubusercontent.com/y80x86ol/img/main/2021/20210718153507.png) ## 注意事项 ### 1. node节点变动任务变化问题 因为实现采用的是go协程,内部采用通道来进行的work开启关闭,如果出现阻塞程序,此通知需要等待阻塞完成才能进行通知操作。 最新的版本V2.1.0已经将关闭控制交由work来实现,每个work需要根据阻塞情况实现work关闭。 V2.1.0还增加了对一次性任务的实现支持 ### 2. 延时反应问题 程序采用的是leader任务分发,所以当有节点加入的时候,不会瞬时反应,需要等待leader收到节点变动通知,重新分配任务,然后通知所有follower节点。 ### 3. leader服务问题 选举算法采用的最小节点法,所以最好保证每台服务器配置都一致,不然容易造成work任务分配不均匀问题。 ### 4. 脑裂问题 **网络波动** leader的zk连接有默认20s掉线检查时间,在20s内是不会认为离线的,如果在20s内重新连接上服务器,依然还是leader。(此请求仅仅只针对网络波动) 如果超过20s重新链接的情况未做测试 **服务器宕机** 如果是直接leader服务器宕机,然后重启,那么自然就会成为follower节点。 ### 5. 生产使用 此项目编写时间短暂,功能不够完善,测试覆盖面不足,请不要直接用于生产环境使用。 ## todo - worker稳定性测试 - 增加web管理界面 - 日志代码优化 ## 其他 开发过程中,为了方便本地zookeeper管理,推荐使用 [PrettyZoo](https://github.com/vran-dev/PrettyZoo) 客户端工具 ## 贡献代码 欢迎大家fork和提pull requests。 ## 意见&建议 如果你有任何的想法或是建议那就给我留个言吧!