1 Star 0 Fork 0

张璐月/channel_and_queue

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
.idea
cond
message_queue
go 消息队列.assets
message_queue.go
message_queue_test.go
readme.md
设计模式源码.excalidraw
other_complicated_task_pool
queue
ring_buffer
task_pool
go.mod
go.sum
main.go
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

go 消息队列 涉及channel、ring buffer

前言

文章主要是针对消息队列的思考,消息队列核心实现使用了channel、ring buffer和map结构。

文章开篇介绍了channel的实现原理,然后是对消息队列的分析与实现,其中消息缓冲使用ring buffer存储,所以在文章第三部分介绍了ring buffer。

channel原理

chansend

chansend-17483332180963

chanrecv

img

消息队列

消息队列是一种应用间异步通讯方式,解耦了消息发送者和接收者,提供一定的缓冲。

消息订阅实现思考

  1. 没有消费者订阅消息时,是否提供消息缓冲区?
  2. 缓冲区信息是否需要群发所有消费者,还是只需要新订阅的消费者取走?
  3. 缓冲区未发送数据保留还是丢弃?

我的设计选择:整体上没取走的就丢弃;但对于没有消费者时,提供一定的缓冲。

没有消费者订阅消息时,是否提供消息缓冲区?

提供。不提供的话,缓冲区会导致没有消费者时消息丢失或者阻塞生产者

缓冲区信息是否需要群发所有消费者,还是只需要新订阅的消费者取走?

群发。保证消息的公平性和一致性

缓冲区未发送数据保留还是丢弃?

不保留。消费者能取走的就取,取不了的就丢弃

接口设计

type MessageQueue struct {
	chanList          map[string][]chan Message   // 切片的数量就是消费者的数量
	buffers           map[string]ring_buffer.Ring // 使用ring Buffer存储缓存
	bufferSize        int                         // 缓冲容量
	mutex             sync.RWMutex
	ringBufferCreator ring_buffer.RingBufferCreator // 选择基于不同的覆盖策略的ringBuffer:丢弃、覆盖等
}

func (m *MessageQueue) Send(ctx context.Context, topic string, msg Message) error // 发送消息
func (m *MessageQueue) Subscribe(topic string, cap int) <-chan Message // 订阅主题
func (m *MessageQueue) distributeBufferMsg(topic string, ringBuffer ring_buffer.Ring) // 后台goroutine发送缓冲数据

设计模式

消息队列使用生产者-消费者模型、发布订阅模式

![image-20250530102024343](go 消息队列.assets/image-20250530102024343.png)

生产者-消费者模型和发布订阅模式关系和区别

相同点

都常用于异步通信

不同点
  • 生产者-消费者模型共享缓冲区,低耦合,消费者需要知道缓冲区的存在;常用于点对点的通信,适用场景:任务调度、I/O操作,主要解决生产者、消费者数据发送速度不匹配的问题
  • 发布订阅模式解耦消息发送者和接收者,可以进行主题订阅,使用中间件进行通信,消息发送者和接收者无需知道对方的存在(完全解耦);常用于多对多的通讯,比如广播、多播,适用场景:事件通知、微服务通讯

支持能力

  • 主题订阅
  • 无消费者订阅主题时,暂存发送消息
    • 暂存使用ring buffer,可自行配置ring buffer覆盖策略:丢弃、覆盖等
    • 有消费者订阅消息后,后台启动goroutine发送缓冲消息给所有订阅的消费者

源码

gitee:https://gitee.com/luyue_zhang/channel/tree/master/message_queue

测试

![image-20250530104941653](go 消息队列.assets/image-20250530104941653.png)

常用的消息队列方案

  • 使用了redis,可以使用redis streams 特点:轻便
  • 分布式、高可用:kafka
  • 复杂路由、功能丰富:RabbitMQ
  • 轻量、高性能:NATS

Ring buffer

ring buffer是什么

ring buffer是容量固定的线性数据结构,通过内部两个指针循环移动进行数据的写入和读取,也叫circle buffer。

核心特性

  • 容量固定:避免内存动态分配,资源利用率高
  • 高效读写:读写操作时间复杂度O(1),适合实时系统
  • 循环利用:通过指针移动避免数据迁移和内存分配,适合高吞吐量场景

写覆盖策略

写覆盖策略:丢弃、覆盖、阻塞写入

策略使用维度分析

业务需求
  • 对于完整性强的场景可以考虑阻塞写入,同时提供灵活性告诉写入者队列是否已满,让写入者调整写入速度。比如:任务调度、金融

  • 对于实时性强的场景可以考虑覆盖。比如:日志、传感器

系统负载

系统负载高时丢弃、覆盖,系统负载低时阻塞

数据价值

确认是新数据重要还是旧数据重要

源码

gitee: https://gitee.com/luyue_zhang/channel/tree/master/ring_buffer

涉及技术:组合

测试

![ring_buffer测试运行](go 消息队列.assets/ring_buffer测试运行.gif)

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/luyue_zhang/channel.git
git@gitee.com:luyue_zhang/channel.git
luyue_zhang
channel
channel_and_queue
master

搜索帮助