1 Star 0 Fork 0

浅言腻耳 / kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
client.go 1.81 KB
一键复制 编辑 原始数据 按行查看 历史
huangxiao 提交于 2021-06-03 17:50 . update
package kafka
import (
"github.com/Shopify/sarama"
)
// Client kafka客户端接口
type Client interface {
SyncProducer() (SyncProducer, error)
Consumer() (Consumer, error)
ConsumerGroup(groupID string) (ConsumerGroup, error)
Close() error
Closed() bool
}
type client struct {
client sarama.Client
}
func (d client) Close() error {
return d.client.Close()
}
func (d client) Closed() bool {
return d.client.Closed()
}
func (d client) ConsumerGroup(groupID string) (ConsumerGroup, error) {
sGroup, err := sarama.NewConsumerGroupFromClient(groupID, d.client)
if err != nil {
return nil, err
}
return &consumerGroup{
consumerGroup: sGroup,
}, err
}
func (d client) Consumer() (Consumer, error) {
sConsumer, err := sarama.NewConsumerFromClient(d.client)
if err != nil {
return nil, err
}
return &consumer{
consumer: sConsumer,
}, nil
}
// syncProducer 生成同步生产者对象
func (d client) SyncProducer() (SyncProducer, error) {
sProducer, err := sarama.NewSyncProducerFromClient(d.client)
if err != nil {
return nil, err
}
return &syncProducer{
producer: sProducer,
}, nil
}
// NewClient 初始化kafka客户端
/*func NewClient(addrs []string, config *Config) (Client, error) {
sConfig := saramaConfig(config)
sClient, err := sarama.NewClient(addrs, sConfig)
if err != nil {
return nil, err
}
return client{
client: sClient,
}, nil
}*/
// NewClient 初始化kafka客户端 带密码
func NewClientWithPass(addrs []string, config *Config, SaslConfig *KaConfig) (Client, error) {
sConfig := saramaConfig(config, SaslConfig)
sConfig.Net.SASL.Enable = SaslConfig.SaslEnable
sConfig.Net.SASL.User = SaslConfig.User
sConfig.Net.SASL.Password = SaslConfig.Password
sClient, err := sarama.NewClient(addrs, sConfig)
if err != nil {
return nil, err
}
return client{
client: sClient,
}, nil
}
1
https://gitee.com/hnorm/kafka.git
git@gitee.com:hnorm/kafka.git
hnorm
kafka
kafka
master

搜索帮助