1 Star 0 Fork 0

tym_hmm/go-kafa-Shopify-sarama

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
message_set.go 2.36 KB
一键复制 编辑 原始数据 按行查看 历史
天蝎儿 提交于 2023-01-06 17:03 +08:00 . build on 1.137.2
package sarama
import "errors"
type MessageBlock struct {
Offset int64
Msg *Message
}
// Messages convenience helper which returns either all the
// messages that are wrapped in this block
func (msb *MessageBlock) Messages() []*MessageBlock {
if msb.Msg.Set != nil {
return msb.Msg.Set.Messages
}
return []*MessageBlock{msb}
}
func (msb *MessageBlock) encode(pe packetEncoder) error {
pe.putInt64(msb.Offset)
pe.push(&lengthField{})
err := msb.Msg.encode(pe)
if err != nil {
return err
}
return pe.pop()
}
func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
if msb.Offset, err = pd.getInt64(); err != nil {
return err
}
lengthDecoder := acquireLengthField()
defer releaseLengthField(lengthDecoder)
if err = pd.push(lengthDecoder); err != nil {
return err
}
msb.Msg = new(Message)
if err = msb.Msg.decode(pd); err != nil {
return err
}
if err = pd.pop(); err != nil {
return err
}
return nil
}
type MessageSet struct {
PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
OverflowMessage bool // whether the set on the wire contained an overflow message
Messages []*MessageBlock
}
func (ms *MessageSet) encode(pe packetEncoder) error {
for i := range ms.Messages {
err := ms.Messages[i].encode(pe)
if err != nil {
return err
}
}
return nil
}
func (ms *MessageSet) decode(pd packetDecoder) (err error) {
ms.Messages = nil
for pd.remaining() > 0 {
magic, err := magicValue(pd)
if err != nil {
if errors.Is(err, ErrInsufficientData) {
ms.PartialTrailingMessage = true
return nil
}
return err
}
if magic > 1 {
return nil
}
msb := new(MessageBlock)
err = msb.decode(pd)
if err == nil {
ms.Messages = append(ms.Messages, msb)
} else if errors.Is(err, ErrInsufficientData) {
// As an optimization the server is allowed to return a partial message at the
// end of the message set. Clients should handle this case. So we just ignore such things.
if msb.Offset == -1 {
// This is an overflow message caused by chunked down conversion
ms.OverflowMessage = true
} else {
ms.PartialTrailingMessage = true
}
return nil
} else {
return err
}
}
return nil
}
func (ms *MessageSet) addMessage(msg *Message) {
block := new(MessageBlock)
block.Msg = msg
ms.Messages = append(ms.Messages, block)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tym_hmm/go-kafa-shopify-sarama.git
git@gitee.com:tym_hmm/go-kafa-shopify-sarama.git
tym_hmm
go-kafa-shopify-sarama
go-kafa-Shopify-sarama
v1.37.2

搜索帮助