1 Star 0 Fork 0

tym_hmm / go-kafa-Shopify-sarama

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
transaction_manager.go 27.63 KB
一键复制 编辑 原始数据 按行查看 历史
天蝎儿 提交于 2023-01-06 17:03 . build on 1.137.2
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
package sarama
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
// ProducerTxnStatusFlag mark current transaction status.
type ProducerTxnStatusFlag int16
const (
// ProducerTxnFlagUninitialized when txnmgr is created
ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
// ProducerTxnFlagInitializing when txnmgr is initilizing
ProducerTxnFlagInitializing
// ProducerTxnFlagReady when is ready to receive transaction
ProducerTxnFlagReady
// ProducerTxnFlagInTransaction when transaction is started
ProducerTxnFlagInTransaction
// ProducerTxnFlagEndTransaction when transaction will be committed
ProducerTxnFlagEndTransaction
// ProducerTxnFlagInError whan having abortable or fatal error
ProducerTxnFlagInError
// ProducerTxnFlagCommittingTransaction when committing txn
ProducerTxnFlagCommittingTransaction
// ProducerTxnFlagAbortingTransaction when committing txn
ProducerTxnFlagAbortingTransaction
// ProducerTxnFlagAbortableError when producer encounter an abortable error
// Must call AbortTxn in this case.
ProducerTxnFlagAbortableError
// ProducerTxnFlagFatalError when producer encounter an fatal error
// Must Close an recreate it.
ProducerTxnFlagFatalError
)
func (s ProducerTxnStatusFlag) String() string {
status := make([]string, 0)
if s&ProducerTxnFlagUninitialized != 0 {
status = append(status, "ProducerTxnStateUninitialized")
}
if s&ProducerTxnFlagInitializing != 0 {
status = append(status, "ProducerTxnStateInitializing")
}
if s&ProducerTxnFlagReady != 0 {
status = append(status, "ProducerTxnStateReady")
}
if s&ProducerTxnFlagInTransaction != 0 {
status = append(status, "ProducerTxnStateInTransaction")
}
if s&ProducerTxnFlagEndTransaction != 0 {
status = append(status, "ProducerTxnStateEndTransaction")
}
if s&ProducerTxnFlagInError != 0 {
status = append(status, "ProducerTxnStateInError")
}
if s&ProducerTxnFlagCommittingTransaction != 0 {
status = append(status, "ProducerTxnStateCommittingTransaction")
}
if s&ProducerTxnFlagAbortingTransaction != 0 {
status = append(status, "ProducerTxnStateAbortingTransaction")
}
if s&ProducerTxnFlagAbortableError != 0 {
status = append(status, "ProducerTxnStateAbortableError")
}
if s&ProducerTxnFlagFatalError != 0 {
status = append(status, "ProducerTxnStateFatalError")
}
return strings.Join(status, "|")
}
// transactionManager keeps the state necessary to ensure idempotent production
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
transactionalID string
transactionTimeout time.Duration
client Client
// when kafka cluster is at least 2.5.0.
// used to recover when producer failed.
coordinatorSupportsBumpingEpoch bool
// When producer need to bump it's epoch.
epochBumpRequired bool
// Record last seen error.
lastError error
// Ensure that status is never accessed with a race-condition.
statusLock sync.RWMutex
status ProducerTxnStatusFlag
// Ensure that only one goroutine will update partitions in current transaction.
partitionInTxnLock sync.Mutex
pendingPartitionsInCurrentTxn topicPartitionSet
partitionsInCurrentTxn topicPartitionSet
// Offsets to add to transaction.
offsetsInCurrentTxn map[string]topicPartitionOffsets
}
const (
noProducerID = -1
noProducerEpoch = -1
// see publishTxnPartitions comment.
addPartitionsRetryBackoff = 20 * time.Millisecond
)
// txnmngr allowed transitions.
var producerTxnTransitions = map[ProducerTxnStatusFlag][]ProducerTxnStatusFlag{
ProducerTxnFlagUninitialized: {
ProducerTxnFlagReady,
ProducerTxnFlagInError,
},
// When we need are initilizing
ProducerTxnFlagInitializing: {
ProducerTxnFlagInitializing,
ProducerTxnFlagReady,
ProducerTxnFlagInError,
},
// When we have initilized transactional producer
ProducerTxnFlagReady: {
ProducerTxnFlagInTransaction,
},
// When beginTxn has been called
ProducerTxnFlagInTransaction: {
// When calling commit or abort
ProducerTxnFlagEndTransaction,
// When got an error
ProducerTxnFlagInError,
},
ProducerTxnFlagEndTransaction: {
// When epoch bump
ProducerTxnFlagInitializing,
// When commit is good
ProducerTxnFlagReady,
// When got an error
ProducerTxnFlagInError,
},
// Need to abort transaction
ProducerTxnFlagAbortableError: {
// Call AbortTxn
ProducerTxnFlagAbortingTransaction,
// When got an error
ProducerTxnFlagInError,
},
// Need to close producer
ProducerTxnFlagFatalError: {
ProducerTxnFlagFatalError,
},
}
type topicPartition struct {
topic string
partition int32
}
// to ensure that we don't do a full scan every time a partition or an offset is added.
type topicPartitionSet map[topicPartition]struct{}
type topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
func (s topicPartitionSet) mapToRequest() map[string][]int32 {
result := make(map[string][]int32, len(s))
for tp := range s {
result[tp.topic] = append(result[tp.topic], tp.partition)
}
return result
}
func (s topicPartitionOffsets) mapToRequest() map[string][]*PartitionOffsetMetadata {
result := make(map[string][]*PartitionOffsetMetadata, len(s))
for tp, offset := range s {
result[tp.topic] = append(result[tp.topic], offset)
}
return result
}
// Return true if current transition is allowed.
func (t *transactionManager) isTransitionValid(target ProducerTxnStatusFlag) bool {
for status, allowedTransitions := range producerTxnTransitions {
if status&t.status != 0 {
for _, allowedTransition := range allowedTransitions {
if allowedTransition&target != 0 {
return true
}
}
}
}
return false
}
// Get current transaction status.
func (t *transactionManager) currentTxnStatus() ProducerTxnStatusFlag {
t.statusLock.RLock()
defer t.statusLock.RUnlock()
return t.status
}
// Try to transition to a valid status and return an error otherwise.
func (t *transactionManager) transitionTo(target ProducerTxnStatusFlag, err error) error {
t.statusLock.Lock()
defer t.statusLock.Unlock()
if !t.isTransitionValid(target) {
return ErrTransitionNotAllowed
}
if target&ProducerTxnFlagInError != 0 {
if err == nil {
return ErrCannotTransitionNilError
}
t.lastError = err
} else {
t.lastError = nil
}
DebugLogger.Printf("txnmgr/transition [%s] transition from %s to %s\n", t.transactionalID, t.status, target)
t.status = target
return err
}
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence, t.producerEpoch
}
func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}
func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}
// Compute retry backoff considered current attempts.
func (t *transactionManager) computeBackoff(attemptsRemaining int) time.Duration {
if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
maxRetries := t.client.Config().Producer.Transaction.Retry.Max
retries := maxRetries - attemptsRemaining
return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
}
return t.client.Config().Producer.Transaction.Retry.Backoff
}
// return true is txnmngr is transactinal.
func (t *transactionManager) isTransactional() bool {
return t.transactionalID != ""
}
// add specified offsets to current transaction.
func (t *transactionManager) addOffsetsToTxn(offsetsToAdd map[string][]*PartitionOffsetMetadata, groupId string) error {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
return ErrTransactionNotReady
}
if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
return t.lastError
}
if _, ok := t.offsetsInCurrentTxn[groupId]; !ok {
t.offsetsInCurrentTxn[groupId] = topicPartitionOffsets{}
}
for topic, offsets := range offsetsToAdd {
for _, offset := range offsets {
tp := topicPartition{topic: topic, partition: offset.Partition}
t.offsetsInCurrentTxn[groupId][tp] = offset
}
}
return nil
}
// send txnmgnr save offsets to transaction coordinator.
func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
// First AddOffsetsToTxn
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
lastError := exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
response, err := coordinator.AddOffsetsToTxn(&AddOffsetsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
})
if err != nil {
// If an error occurred try to refresh current transaction coordinator.
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if response == nil {
// If no response is returned just retry.
return true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
t.transactionalID, groupId, response)
// If no error, just exit.
return false, nil
}
switch response.Err {
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrOffsetsLoadInProgress:
fallthrough
case ErrConcurrentTransactions:
// Retry
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
return false, t.abortableErrorIfPossible(response.Err)
case ErrGroupAuthorizationFailed:
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
default:
// Others are fatal
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return true, response.Err
}, nil)
if lastError != nil {
return offsets, lastError
}
resultOffsets := offsets
// Then TxnOffsetCommit
// note the result is not completed until the TxnOffsetCommit returns
attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
var r topicPartitionOffsets
for attemptsRemaining >= 0 {
var retry bool
r, retry, err = run()
if !retry {
return r, err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return r, err
}
return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
consumerGroupCoordinator, err := t.client.Coordinator(groupId)
if err != nil {
return resultOffsets, true, err
}
responses, err := consumerGroupCoordinator.TxnOffsetCommit(&TxnOffsetCommitRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
Topics: offsets.mapToRequest(),
})
if err != nil {
_ = consumerGroupCoordinator.Close()
_ = t.client.RefreshCoordinator(groupId)
return resultOffsets, true, err
}
if responses == nil {
return resultOffsets, true, ErrTxnUnableToParseResponse
}
var responseErrors []error
failedTxn := topicPartitionOffsets{}
for topic, partitionErrors := range responses.Topics {
for _, partitionError := range partitionErrors {
switch partitionError.Err {
case ErrNoError:
continue
// If the topic is unknown or the coordinator is loading, retry with the current coordinator
case ErrRequestTimedOut:
fallthrough
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = consumerGroupCoordinator.Close()
_ = t.client.RefreshCoordinator(groupId)
fallthrough
case ErrUnknownTopicOrPartition:
fallthrough
case ErrOffsetsLoadInProgress:
// Do nothing just retry
case ErrIllegalGeneration:
fallthrough
case ErrUnknownMemberId:
fallthrough
case ErrFencedInstancedId:
fallthrough
case ErrGroupAuthorizationFailed:
return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
default:
// Others are fatal
return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
}
tp := topicPartition{topic: topic, partition: partitionError.Partition}
failedTxn[tp] = offsets[tp]
responseErrors = append(responseErrors, partitionError.Err)
}
}
resultOffsets = failedTxn
if len(resultOffsets) == 0 {
DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s %+v\n",
t.transactionalID, groupId)
return resultOffsets, false, nil
}
return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
}, nil)
}
func (t *transactionManager) initProducerId() (int64, int16, error) {
isEpochBump := false
req := &InitProducerIDRequest{}
if t.isTransactional() {
req.TransactionalID = &t.transactionalID
req.TransactionTimeout = t.transactionTimeout
}
if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
req.Version = 3
isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
t.coordinatorSupportsBumpingEpoch = true
req.ProducerID = t.producerID
req.ProducerEpoch = t.producerEpoch
} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
req.Version = 2
}
if isEpochBump {
err := t.transitionTo(ProducerTxnFlagInitializing, nil)
if err != nil {
return -1, -1, err
}
DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
t.transactionalID)
} else {
DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
t.transactionalID, t.producerID, t.producerEpoch)
}
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
pid := int64(-1)
pepoch := int16(-1)
for attemptsRemaining >= 0 {
var retry bool
pid, pepoch, retry, err = run()
if !retry {
return pid, pepoch, err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return -1, -1, err
}
return exec(func() (int64, int16, bool, error) {
var err error
var coordinator *Broker
if t.isTransactional() {
coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
} else {
coordinator = t.client.LeastLoadedBroker()
}
if err != nil {
return -1, -1, true, err
}
response, err := coordinator.InitProducerID(req)
if err != nil {
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
}
return -1, -1, true, err
}
if response == nil {
return -1, -1, true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
if isEpochBump {
t.sequenceNumbers = make(map[string]int32)
}
err := t.transitionTo(ProducerTxnFlagReady, nil)
if err != nil {
return -1, -1, true, err
}
DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
t.transactionalID, response)
return response.ProducerID, response.ProducerEpoch, false, nil
}
switch response.Err {
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
}
// Fatal errors
default:
return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return -1, -1, true, response.Err
}, nil)
}
// if kafka cluster is at least 2.5.0 mark txnmngr to bump epoch else mark it as fatal.
func (t *transactionManager) abortableErrorIfPossible(err error) error {
if t.coordinatorSupportsBumpingEpoch {
t.epochBumpRequired = true
return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}
return t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
}
// End current transaction.
func (t *transactionManager) completeTransaction() error {
if t.epochBumpRequired {
err := t.transitionTo(ProducerTxnFlagInitializing, nil)
if err != nil {
return err
}
} else {
err := t.transitionTo(ProducerTxnFlagReady, nil)
if err != nil {
return err
}
}
t.lastError = nil
t.epochBumpRequired = false
t.partitionsInCurrentTxn = topicPartitionSet{}
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
t.offsetsInCurrentTxn = map[string]topicPartitionOffsets{}
return nil
}
// send EndTxn request with commit flag. (true when committing false otherwise)
func (t *transactionManager) endTxn(commit bool) error {
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
return exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
response, err := coordinator.EndTxn(&EndTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
TransactionResult: commit,
})
if err != nil {
// Always retry on network error
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if response == nil {
return true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
t.transactionalID, response)
return false, t.completeTransaction()
}
switch response.Err {
// Need to refresh coordinator
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrOffsetsLoadInProgress:
fallthrough
case ErrConcurrentTransactions:
// Just retry
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
return false, t.abortableErrorIfPossible(response.Err)
// Fatal errors
default:
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return true, response.Err
}, nil)
}
// We will try to publish associated offsets for each groups
// then send endtxn request to mark transaction as finished.
func (t *transactionManager) finishTransaction(commit bool) error {
t.mutex.Lock()
defer t.mutex.Unlock()
// Ensure no error when committing or abording
if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
} else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
return t.lastError
}
// if no records has been sent don't do anything.
if len(t.partitionsInCurrentTxn) == 0 {
return t.completeTransaction()
}
epochBump := t.epochBumpRequired
// If we're aborting the transaction, so there should be no need to add offsets.
if commit && len(t.offsetsInCurrentTxn) > 0 {
for group, offsets := range t.offsetsInCurrentTxn {
newOffsets, err := t.publishOffsetsToTxn(offsets, group)
if err != nil {
t.offsetsInCurrentTxn[group] = newOffsets
return err
}
delete(t.offsetsInCurrentTxn, group)
}
}
if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
return t.lastError
}
if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
err := t.endTxn(commit)
if err != nil {
return err
}
if !epochBump {
return nil
}
}
// reset pid and epoch if needed.
return t.initializeTransactions()
}
// called before sending any transactional record
// won't do anything if current topic-partition is already added to transaction.
func (t *transactionManager) maybeAddPartitionToCurrentTxn(topic string, partition int32) {
if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return
}
tp := topicPartition{topic: topic, partition: partition}
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if _, ok := t.partitionsInCurrentTxn[tp]; ok {
// partition is already added
return
}
t.pendingPartitionsInCurrentTxn[tp] = struct{}{}
}
// Makes a request to kafka to add a list of partitions ot the current transaction.
func (t *transactionManager) publishTxnPartitions() error {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
}
if len(t.pendingPartitionsInCurrentTxn) == 0 {
return nil
}
// Remove the partitions from the pending set regardless of the result. We use the presence
// of partitions in the pending set to know when it is not safe to send batches. However, if
// the partitions failed to be added and we enter an error state, we expect the batches to be
// aborted anyway. In this case, we must be able to continue sending the batches which are in
// retry for partitions that were successfully added.
removeAllPartitionsOnFatalOrAbortedError := func() {
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
}
// We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
// CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
// we don't want to wait too long before trying to start the new one.
//
// This is only a temporary fix, the long term solution is being tracked in
// https://issues.apache.org/jira/browse/KAFKA-5482
retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
computeBackoff := func(attemptsRemaining int) time.Duration {
if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
maxRetries := t.client.Config().Producer.Transaction.Retry.Max
retries := maxRetries - attemptsRemaining
return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
}
return retryBackoff
}
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
return exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
addPartResponse, err := coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
})
if err != nil {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if addPartResponse == nil {
return true, ErrTxnUnableToParseResponse
}
// remove from the list partitions that have been successfully updated
var responseErrors []error
for topic, results := range addPartResponse.Errors {
for _, response := range results {
tp := topicPartition{topic: topic, partition: response.Partition}
switch response.Err {
case ErrNoError:
// Mark partition as added to transaction
t.partitionsInCurrentTxn[tp] = struct{}{}
delete(t.pendingPartitionsInCurrentTxn, tp)
continue
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrUnknownTopicOrPartition:
fallthrough
case ErrOffsetsLoadInProgress:
// Retry topicPartition
case ErrConcurrentTransactions:
if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
retryBackoff = addPartitionsRetryBackoff
}
case ErrOperationNotAttempted:
fallthrough
case ErrTopicAuthorizationFailed:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.abortableErrorIfPossible(response.Err)
// Fatal errors
default:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
responseErrors = append(responseErrors, response.Err)
}
}
// handle end
if len(t.pendingPartitionsInCurrentTxn) == 0 {
DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
t.transactionalID, addPartResponse)
return false, nil
}
return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
}, nil)
}
// Build a new transaction manager sharing producer client.
func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
txnmgr := &transactionManager{
producerID: noProducerID,
producerEpoch: noProducerEpoch,
client: client,
pendingPartitionsInCurrentTxn: topicPartitionSet{},
partitionsInCurrentTxn: topicPartitionSet{},
offsetsInCurrentTxn: make(map[string]topicPartitionOffsets),
status: ProducerTxnFlagUninitialized,
}
if conf.Producer.Idempotent {
txnmgr.transactionalID = conf.Producer.Transaction.ID
txnmgr.transactionTimeout = conf.Producer.Transaction.Timeout
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}
var err error
txnmgr.producerID, txnmgr.producerEpoch, err = txnmgr.initProducerId()
if err != nil {
return nil, err
}
Logger.Printf("txnmgr/init-producer-id [%s] obtained a ProducerId: %d and ProducerEpoch: %d\n",
txnmgr.transactionalID, txnmgr.producerID, txnmgr.producerEpoch)
}
return txnmgr, nil
}
// re-init producer-id and producer-epoch if needed.
func (t *transactionManager) initializeTransactions() (err error) {
t.producerID, t.producerEpoch, err = t.initProducerId()
return
}
Go
1
https://gitee.com/tym_hmm/go-kafa-shopify-sarama.git
git@gitee.com:tym_hmm/go-kafa-shopify-sarama.git
tym_hmm
go-kafa-shopify-sarama
go-kafa-Shopify-sarama
v1.37.2

搜索帮助