代码拉取完成,页面将自动刷新
package kafka
import (
"container/list"
"fmt"
"gitee.com/titan-kit/titan/log"
)
// Option 是ESB选项.
type Option func(*options)
type options struct {
systemId string
logger log.Logger
backends []string
}
func WithLogger(logger log.Logger) Option {
return func(o *options) {
o.logger = logger
}
}
func WithSystemId(systemId string) Option {
return func(o *options) {
o.systemId = systemId
}
}
func WithBackends(backends []string) Option {
return func(o *options) {
o.backends = backends
}
}
func New(opts ...Option) kafka {
options := options{systemId: "0000", logger: log.DefaultLogger, backends: []string{"localhost:9092"}}
for _, o := range opts {
o(&options)
}
logger := log.NewSlf4g("backends/KafkaProvider", options.logger)
return kafka{options, logger, list.New(), make(map[string]consumer, 0)}
}
type kafka struct {
opt options
log *log.Slf4g
producerList *list.List
consumerList map[string]consumer
}
// Sender 初始化一个新的生产者对象。
func (k kafka) Sender() Sender {
producer := newProducer(k.log, k.opt.backends)
k.producerList.PushBack(producer)
return producer
}
// StartDefaultReader 使用默认的消费者配置信息来初始化,具体可参看`StartReader`的说明。
//
// @param topic
// @param groupId
// @param threads
// @param callback
func (k kafka) StartDefaultReader(topic, groupId string, callback Reader) *ReaderRef {
return k.StartReader(topic, groupId, callback, func(err error) {
panic(fmt.Sprintf("Kafka消费者线程(StartDefaultReader)发生错误:%+v\n", err))
})
}
// StartSystemIdReader 使用默认的消费者配置和根据当前系统标示(4位数字)作为消费组标示来初始化,具体可参看`StartReader`的说明。
//
// @param topic 消息队列名称
// @param callback 消息的回调处理器
func (k kafka) StartSystemIdReader(topic string, callback Reader) *ReaderRef {
return k.StartReader(topic, "sys-"+k.opt.systemId, callback, func(err error) {
panic(fmt.Sprintf("Kafka消费者线程(StartSystemIdReader)发生错误:%+v\n", err))
})
}
// StartReader 使用给定的配置信息来启动一个kafka消费者组,该方法会快速无阻塞的启动指定数量的消费者并归为一个消费者组中,当读取到消息后会调用指定的回调处理器进行处理,
// 由于回调处理器是单实例,因此如果要充分发挥多线程消费的优势,需要利用一些技术手段提高回调处理器的处理效率(比如异步处理)。
//
// 对于消费者线程数的设置,一般来说位于同一个消费组中的所有消费者数量不要超过对于消息队列的分区数,对于kafka而言,每个分区只能对应相同消费组中的一个消费者,而一个消费者则可以对应多个分区。
//
// @param topic 消息队列名称
// @param groupId 消费者归属的消费组标示
// @param callback 消息的回调处理器
func (k kafka) StartReader(topic, groupId string, callback Reader, errors func(error)) *ReaderRef {
ref := newReaderRef(topic + groupId)
consumer := newConsumer(k.log, k.opt.backends)
consumer.start(topic, groupId, errors, callback)
k.consumerList[ref.key] = consumer
return ref
}
// Close 关闭所有资源
func (k kafka) Close() {
for e := k.producerList.Front(); e != nil; e = e.Next() {
e.Value.(producer).Close()
}
for _, v := range k.consumerList {
v.close()
}
}
// CloseByReaderRef 关闭指定引用对应的消费者。
//
// @param ref 定引用对应的消费者
func (k kafka) CloseByReaderRef(ref *ReaderRef) {
if len(ref.key) == 0 {
return
}
consumer := k.consumerList[ref.key]
if consumer.started {
consumer.close()
}
}
// ReaderRef 启动消费者获得的消费者引用,可以通过`close`来关闭指定的消费者。
type ReaderRef struct {
key string
}
func newReaderRef(key string) *ReaderRef {
return &ReaderRef{key: key}
}
// Sender kafka的数据写入接口,所有数据都采用string形式写入。
type Sender interface {
// Send 将给定的数据发送到kafka特定的topic上
Send(topic, content string)
// SendPartition 将给定的数据content发送到kafka特定的topic上,如果指定了key则根据key的hash结果来选择topic的分区,如果没有指定key则随机选择一个分区。
SendPartition(topic, key, content string)
// Close 关闭处理,程序终止时的资源释放。
Close()
}
// Reader kafka的数据读取接口定义,该接口的实现总会被认为是从最后记录的偏移位置开始读取数据(如果偏移值有保存的话),如果想要从指定偏移位置开始读取数据请实现方法`GetOffset`.
type Reader interface {
// OnRead 读取到数据后的回调处理方法,由业务系统实现处理。
//
// topic 数据所属的队列
// partition 数据所在的分区编号,从0开始
// offset 数据所在的offset
// key 数据的key,发送时指定的,可能为Null
// value 数据的内容
OnRead(topic string, partition int32, offset int64, key, value []byte) error
// GetOffset 获取从指定主题(topic)中指定分区(partition)读取数据的起始偏移位置,如果不指定(返回Null)
// 则表明依据系统记录的最后offset来获取数据,如果返回小于0的值(如-1)则表明从初始位置开始读取。
// 特别注意,如果返回的值超过了该分区的最大偏移值则会被认为是从初始位置开始读取数据。
GetOffset(topic string) []PartitionOffset
}
// PartitionOffset 分区的起始偏移位置
type PartitionOffset struct {
Partition int32
Offset int64
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。