文章主要是针对消息队列的思考,消息队列核心实现使用了channel、ring buffer和map结构。
文章开篇介绍了channel的实现原理,然后是对消息队列的分析与实现,其中消息缓冲使用ring buffer存储,所以在文章第三部分介绍了ring buffer。
消息队列是一种应用间异步通讯方式,解耦了消息发送者和接收者,提供一定的缓冲。
我的设计选择:整体上没取走的就丢弃;但对于没有消费者时,提供一定的缓冲。
提供。不提供的话,缓冲区会导致没有消费者时消息丢失或者阻塞生产者
群发。保证消息的公平性和一致性
不保留。消费者能取走的就取,取不了的就丢弃
type MessageQueue struct {
chanList map[string][]chan Message // 切片的数量就是消费者的数量
buffers map[string]ring_buffer.Ring // 使用ring Buffer存储缓存
bufferSize int // 缓冲容量
mutex sync.RWMutex
ringBufferCreator ring_buffer.RingBufferCreator // 选择基于不同的覆盖策略的ringBuffer:丢弃、覆盖等
}
func (m *MessageQueue) Send(ctx context.Context, topic string, msg Message) error // 发送消息
func (m *MessageQueue) Subscribe(topic string, cap int) <-chan Message // 订阅主题
func (m *MessageQueue) distributeBufferMsg(topic string, ringBuffer ring_buffer.Ring) // 后台goroutine发送缓冲数据
消息队列使用生产者-消费者模型、发布订阅模式

都常用于异步通信
gitee:https://gitee.com/luyue_zhang/channel/tree/master/message_queue

ring buffer是容量固定的线性数据结构,通过内部两个指针循环移动进行数据的写入和读取,也叫circle buffer。
写覆盖策略:丢弃、覆盖、阻塞写入
对于完整性强的场景可以考虑阻塞写入,同时提供灵活性告诉写入者队列是否已满,让写入者调整写入速度。比如:任务调度、金融
对于实时性强的场景可以考虑覆盖。比如:日志、传感器
系统负载高时丢弃、覆盖,系统负载低时阻塞
确认是新数据重要还是旧数据重要
gitee: https://gitee.com/luyue_zhang/channel/tree/master/ring_buffer
涉及技术:组合

此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。