代码拉取完成,页面将自动刷新
package sarama
import (
"errors"
"fmt"
"time"
)
const recordBatchOverhead = 49
type recordsArray []*Record
func (e recordsArray) encode(pe packetEncoder) error {
for _, r := range e {
if err := r.encode(pe); err != nil {
return err
}
}
return nil
}
func (e recordsArray) decode(pd packetDecoder) error {
for i := range e {
rec := &Record{}
if err := rec.decode(pd); err != nil {
return err
}
e[i] = rec
}
return nil
}
type RecordBatch struct {
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
CompressionLevel int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
IsTransactional bool
compressedRecords []byte
recordsLen int // uncompressed records size
}
func (b *RecordBatch) LastOffset() int64 {
return b.FirstOffset + int64(b.LastOffsetDelta)
}
func (b *RecordBatch) encode(pe packetEncoder) error {
if b.Version != 2 {
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
}
pe.putInt64(b.FirstOffset)
pe.push(&lengthField{})
pe.putInt32(b.PartitionLeaderEpoch)
pe.putInt8(b.Version)
pe.push(newCRC32Field(crcCastagnoli))
pe.putInt16(b.computeAttributes())
pe.putInt32(b.LastOffsetDelta)
if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
return err
}
if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
return err
}
pe.putInt64(b.ProducerID)
pe.putInt16(b.ProducerEpoch)
pe.putInt32(b.FirstSequence)
if err := pe.putArrayLength(len(b.Records)); err != nil {
return err
}
if b.compressedRecords == nil {
if err := b.encodeRecords(pe); err != nil {
return err
}
}
if err := pe.putRawBytes(b.compressedRecords); err != nil {
return err
}
if err := pe.pop(); err != nil {
return err
}
return pe.pop()
}
func (b *RecordBatch) decode(pd packetDecoder) (err error) {
if b.FirstOffset, err = pd.getInt64(); err != nil {
return err
}
batchLen, err := pd.getInt32()
if err != nil {
return err
}
if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
if b.Version, err = pd.getInt8(); err != nil {
return err
}
crc32Decoder := acquireCrc32Field(crcCastagnoli)
defer releaseCrc32Field(crc32Decoder)
if err = pd.push(crc32Decoder); err != nil {
return err
}
attributes, err := pd.getInt16()
if err != nil {
return err
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask
b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
}
if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
return err
}
if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
return err
}
if b.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if b.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
if b.FirstSequence, err = pd.getInt32(); err != nil {
return err
}
numRecs, err := pd.getArrayLength()
if err != nil {
return err
}
if numRecs >= 0 {
b.Records = make([]*Record, numRecs)
}
bufSize := int(batchLen) - recordBatchOverhead
recBuffer, err := pd.getRawBytes(bufSize)
if err != nil {
if errors.Is(err, ErrInsufficientData) {
b.PartialTrailingRecord = true
b.Records = nil
return nil
}
return err
}
if err = pd.pop(); err != nil {
return err
}
recBuffer, err = decompress(b.Codec, recBuffer)
if err != nil {
return err
}
b.recordsLen = len(recBuffer)
err = decode(recBuffer, recordsArray(b.Records), nil)
if errors.Is(err, ErrInsufficientData) {
b.PartialTrailingRecord = true
b.Records = nil
return nil
}
return err
}
func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
var raw []byte
var err error
if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
return err
}
b.recordsLen = len(raw)
b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
return err
}
func (b *RecordBatch) computeAttributes() int16 {
attr := int16(b.Codec) & int16(compressionCodecMask)
if b.Control {
attr |= controlMask
}
if b.LogAppendTime {
attr |= timestampTypeMask
}
if b.IsTransactional {
attr |= isTransactionalMask
}
return attr
}
func (b *RecordBatch) addRecord(r *Record) {
b.Records = append(b.Records, r)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。