代码拉取完成,页面将自动刷新
package mocks
import (
"errors"
"sync"
"github.com/Shopify/sarama"
)
// SyncProducer implements sarama's SyncProducer interface for testing purposes.
// Before you can use it, you have to set expectations on the mock SyncProducer
// to tell it how to handle calls to SendMessage, so you can easily test success
// and failure scenarios.
type SyncProducer struct {
l sync.Mutex
t ErrorReporter
expectations []*producerExpectation
lastOffset int64
*TopicConfig
newPartitioner sarama.PartitionerConstructor
partitioners map[string]sarama.Partitioner
isTransactional bool
txnLock sync.Mutex
txnStatus sarama.ProducerTxnStatusFlag
}
// NewSyncProducer instantiates a new SyncProducer mock. The t argument should
// be the *testing.T instance of your test method. An error will be written to it if
// an expectation is violated. The config argument is validated and used to handle
// partitioning.
func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
if config == nil {
config = sarama.NewConfig()
}
if err := config.Validate(); err != nil {
t.Errorf("Invalid mock configuration provided: %s", err.Error())
}
return &SyncProducer{
t: t,
expectations: make([]*producerExpectation, 0),
TopicConfig: NewTopicConfig(),
newPartitioner: config.Producer.Partitioner,
partitioners: make(map[string]sarama.Partitioner, 1),
isTransactional: config.Producer.Transaction.ID != "",
txnStatus: sarama.ProducerTxnFlagReady,
}
}
////////////////////////////////////////////////
// Implement SyncProducer interface
////////////////////////////////////////////////
// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessage, so it knows
// how to handle them. You can set a function in each expectation so that the message value
// checked by this function and an error is returned if the match fails.
// If there is no more remaining expectation when SendMessage is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
sp.l.Lock()
defer sp.l.Unlock()
if sp.IsTransactional() && sp.txnStatus&sarama.ProducerTxnFlagInTransaction == 0 {
sp.t.Errorf("attempt to send message when transaction is not started or is in ending state.")
return -1, -1, errors.New("attempt to send message when transaction is not started or is in ending state")
}
if len(sp.expectations) > 0 {
expectation := sp.expectations[0]
sp.expectations = sp.expectations[1:]
topic := msg.Topic
partition, err := sp.partitioner(topic).Partition(msg, sp.partitions(topic))
if err != nil {
sp.t.Errorf("Partitioner returned an error: %s", err.Error())
return -1, -1, err
}
msg.Partition = partition
if expectation.CheckFunction != nil {
errCheck := expectation.CheckFunction(msg)
if errCheck != nil {
sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
return -1, -1, errCheck
}
}
if errors.Is(expectation.Result, errProduceSuccess) {
sp.lastOffset++
msg.Offset = sp.lastOffset
return 0, msg.Offset, nil
}
return -1, -1, expectation.Result
}
sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
return -1, -1, errOutOfExpectations
}
// SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessages, so it knows
// how to handle them. If there is no more remaining expectations when SendMessages is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
sp.l.Lock()
defer sp.l.Unlock()
if len(sp.expectations) >= len(msgs) {
expectations := sp.expectations[0:len(msgs)]
sp.expectations = sp.expectations[len(msgs):]
for i, expectation := range expectations {
topic := msgs[i].Topic
partition, err := sp.partitioner(topic).Partition(msgs[i], sp.partitions(topic))
if err != nil {
sp.t.Errorf("Partitioner returned an error: %s", err.Error())
return err
}
msgs[i].Partition = partition
if expectation.CheckFunction != nil {
errCheck := expectation.CheckFunction(msgs[i])
if errCheck != nil {
sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
return errCheck
}
}
if !errors.Is(expectation.Result, errProduceSuccess) {
return expectation.Result
}
sp.lastOffset++
msgs[i].Offset = sp.lastOffset
}
return nil
}
sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
return errOutOfExpectations
}
func (sp *SyncProducer) partitioner(topic string) sarama.Partitioner {
partitioner := sp.partitioners[topic]
if partitioner == nil {
partitioner = sp.newPartitioner(topic)
sp.partitioners[topic] = partitioner
}
return partitioner
}
// Close corresponds with the Close method of sarama's SyncProducer implementation.
// By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
// so it will write an error to the test state if there's any remaining expectations.
func (sp *SyncProducer) Close() error {
sp.l.Lock()
defer sp.l.Unlock()
if len(sp.expectations) > 0 {
sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
}
return nil
}
////////////////////////////////////////////////
// Setting expectations
////////////////////////////////////////////////
// ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer
// that SendMessage will be called. The mock producer will first call the given function to check
// the message. It will cascade the error of the function, if any, or handle the message as if it
// produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
return sp
}
// ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that
// SendMessage will be called. The mock producer will first call the given function to check the
// message. It will cascade the error of the function, if any, or handle the message as if it
// failed to produce successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
return sp
}
// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
// will be called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it produced
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer {
sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf))
return sp
}
// ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it failed
// to produce successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer {
sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err)
return sp
}
// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it produced successfully, i.e. by
// returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer {
sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(nil)
return sp
}
// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it failed to produce
// successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer {
sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(nil, err)
return sp
}
func (sp *SyncProducer) IsTransactional() bool {
return sp.isTransactional
}
func (sp *SyncProducer) BeginTxn() error {
sp.txnLock.Lock()
defer sp.txnLock.Unlock()
sp.txnStatus = sarama.ProducerTxnFlagInTransaction
return nil
}
func (sp *SyncProducer) CommitTxn() error {
sp.txnLock.Lock()
defer sp.txnLock.Unlock()
sp.txnStatus = sarama.ProducerTxnFlagReady
return nil
}
func (sp *SyncProducer) AbortTxn() error {
sp.txnLock.Lock()
defer sp.txnLock.Unlock()
sp.txnStatus = sarama.ProducerTxnFlagReady
return nil
}
func (sp *SyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag {
return sp.txnStatus
}
func (sp *SyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error {
return nil
}
func (sp *SyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error {
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。