390 Star 2.6K Fork 641

GVPJohn/gf

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
consumer.go 23.83 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
package cluster
import (
"sort"
"sync"
"sync/atomic"
"time"
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
)
// Consumer is a cluster group consumer
type Consumer struct {
client *Client
ownClient bool
consumer sarama.Consumer
subs *partitionMap
consumerID string
groupID string
memberID string
generationID int32
membershipMu sync.RWMutex
coreTopics []string
extraTopics []string
dying, dead chan none
closeOnce sync.Once
consuming int32
messages chan *sarama.ConsumerMessage
errors chan error
partitions chan PartitionConsumer
notifications chan *Notification
customOffsets map[string]map[int32]offsetInfo
commitMu sync.Mutex
}
// NewConsumer initializes a new consumer
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
client, err := NewClient(addrs, config)
if err != nil {
return nil, err
}
consumer, err := NewConsumerFromClient(client, groupID, topics)
if err != nil {
return nil, err
}
consumer.ownClient = true
return consumer, nil
}
// NewConsumerFromClient initializes a new consumer from an existing client.
//
// Please note that clients cannot be shared between consumers (due to Kafka internals),
// they can only be re-used which requires the user to call Close() on the first consumer
// before using this method again to initialize another one. Attempts to use a client with
// more than one consumer at a time will return errors.
func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
if !client.claim() {
return nil, errClientInUse
}
consumer, err := sarama.NewConsumerFromClient(client.Client)
if err != nil {
client.release()
return nil, err
}
sort.Strings(topics)
c := &Consumer{
client: client,
consumer: consumer,
subs: newPartitionMap(),
groupID: groupID,
coreTopics: topics,
dying: make(chan none),
dead: make(chan none),
messages: make(chan *sarama.ConsumerMessage),
errors: make(chan error, client.config.ChannelBufferSize),
partitions: make(chan PartitionConsumer, 1),
notifications: make(chan *Notification),
customOffsets: make(map[string]map[int32]offsetInfo),
}
if err := c.client.RefreshCoordinator(groupID); err != nil {
client.release()
return nil, err
}
go c.mainLoop()
return c, nil
}
// Messages returns the read channel for the messages that are returned by
// the broker.
//
// This channel will only return if Config.Group.Mode option is set to
// ConsumerModeMultiplex (default).
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
// Partitions returns the read channels for individual partitions of this broker.
//
// This will channel will only return if Config.Group.Mode option is set to
// ConsumerModePartitions.
//
// The Partitions() channel must be listened to for the life of this consumer;
// when a rebalance happens old partitions will be closed (naturally come to
// completion) and new ones will be emitted. The returned channel will only close
// when the consumer is completely shut down.
func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
// Errors returns a read channel of errors that occur during offset management, if
// enabled. By default, errors are logged and not returned over this channel. If
// you want to implement any custom error handling, set your config's
// Consumer.Return.Errors setting to true, and read from this channel.
func (c *Consumer) Errors() <-chan error { return c.errors }
// Notifications returns a channel of Notifications that occur during consumer
// rebalancing. Notifications will only be emitted over this channel, if your config's
// Group.Return.Notifications setting to true.
func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
// HighWaterMarks returns the current high water marks for each topic and partition
// Consistency between partitions is not guaranteed since high water marks are updated separately.
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
// MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.MarkOffset(msg.Offset, metadata)
} else {
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
}
// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.MarkOffset(offset, metadata)
} else {
c.markCustomOffset(topic, partition, offset, metadata)
}
}
func (c *Consumer) markCustomOffset(topic string, partition int32, offset int64, metadata string) {
if _, ok := c.customOffsets[topic]; !ok {
c.customOffsets[topic] = make(map[int32]offsetInfo)
}
c.customOffsets[topic][partition] = offsetInfo {
Offset : offset,
Metadata : metadata,
}
}
// MarkOffsets marks stashed offsets as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkOffsets(s *OffsetStash) {
s.mu.Lock()
defer s.mu.Unlock()
for tp, info := range s.offsets {
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.MarkOffset(info.Offset, info.Metadata)
} else {
c.markCustomOffset(tp.Topic, tp.Partition, info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
}
// ResetOffsets marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.ResetOffset(msg.Offset, metadata)
} else {
c.markCustomOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
}
// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.ResetOffset(offset, metadata)
} else {
c.markCustomOffset(topic, partition, offset, metadata)
}
}
// ResetOffsets marks stashed offsets as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetOffsets(s *OffsetStash) {
s.mu.Lock()
defer s.mu.Unlock()
for tp, info := range s.offsets {
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.ResetOffset(info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
}
// Subscriptions returns the consumed topics and partitions
func (c *Consumer) Subscriptions() map[string][]int32 {
return c.subs.Info()
}
// CommitOffsets allows to manually commit previously marked offsets. By default there is no
// need to call this function as the consumer will commit offsets automatically
// using the Config.Consumer.Offsets.CommitInterval setting.
//
// Please be aware that calling this function during an internal rebalance cycle may return
// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
func (c *Consumer) CommitOffsets() error {
c.commitMu.Lock()
defer c.commitMu.Unlock()
memberID, generationID := c.membership()
req := &sarama.OffsetCommitRequest{
Version: 2,
ConsumerGroup: c.groupID,
ConsumerGroupGeneration: generationID,
ConsumerID: memberID,
RetentionTime: -1,
}
if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
req.RetentionTime = int64(ns / time.Millisecond)
}
snap := c.subs.Snapshot()
dirty := false
for tp, state := range snap {
if state.Dirty {
dirty = true
req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
}
}
if !dirty {
return nil
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
resp, err := broker.CommitOffset(req)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
for topic, errs := range resp.Errors {
for partition, kerr := range errs {
if kerr != sarama.ErrNoError {
err = kerr
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.markCommitted(state.Info.Offset)
}
}
}
}
return err
}
// Close safely closes the consumer and releases all resources
func (c *Consumer) Close() (err error) {
c.closeOnce.Do(func() {
close(c.dying)
<-c.dead
if e := c.release(); e != nil {
err = e
}
if e := c.consumer.Close(); e != nil {
err = e
}
close(c.messages)
close(c.errors)
if e := c.leaveGroup(); e != nil {
err = e
}
close(c.partitions)
close(c.notifications)
// drain
for range c.messages {
}
for range c.errors {
}
for p := range c.partitions {
_ = p.Close()
}
for range c.notifications {
}
c.client.release()
if c.ownClient {
if e := c.client.Close(); e != nil {
err = e
}
}
})
return
}
func (c *Consumer) mainLoop() {
defer close(c.dead)
defer atomic.StoreInt32(&c.consuming, 0)
for {
atomic.StoreInt32(&c.consuming, 0)
// Check if close was requested
select {
case <-c.dying:
return
default:
}
// Start next consume cycle
c.nextTick()
}
}
func (c *Consumer) nextTick() {
// Remember previous subscriptions
var notification *Notification
if c.client.config.Group.Return.Notifications {
notification = newNotification(c.subs.Info())
}
// Refresh coordinator
if err := c.refreshCoordinator(); err != nil {
c.rebalanceError(err, nil)
return
}
// Release subscriptions
if err := c.release(); err != nil {
c.rebalanceError(err, nil)
return
}
// Issue rebalance start notification
if c.client.config.Group.Return.Notifications {
c.handleNotification(notification)
}
// Rebalance, fetch new subscriptions
subs, err := c.rebalance()
if err != nil {
c.rebalanceError(err, notification)
return
}
// Coordinate loops, make sure everything is
// stopped on exit
tomb := newLoopTomb()
defer tomb.Close()
// Start the heartbeat
tomb.Go(c.hbLoop)
// Subscribe to topic/partitions
if err := c.subscribe(tomb, subs); err != nil {
c.rebalanceError(err, notification)
return
}
// Update/issue notification with new claims
if c.client.config.Group.Return.Notifications {
notification = notification.success(subs)
c.handleNotification(notification)
}
// Start topic watcher loop
tomb.Go(c.twLoop)
// Start consuming and committing offsets
tomb.Go(c.cmLoop)
atomic.StoreInt32(&c.consuming, 1)
// Wait for signals
select {
case <-tomb.Dying():
case <-c.dying:
}
}
// heartbeat loop, triggered by the mainLoop
func (c *Consumer) hbLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
switch err := c.heartbeat(); err {
case nil, sarama.ErrNoError:
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
return
default:
c.handleError(&Error{Ctx: "heartbeat", error: err})
return
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
// topic watcher loop, triggered by the mainLoop
func (c *Consumer) twLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
topics, err := c.client.Topics()
if err != nil {
c.handleError(&Error{Ctx: "topics", error: err})
return
}
for _, topic := range topics {
if !c.isKnownCoreTopic(topic) &&
!c.isKnownExtraTopic(topic) &&
c.isPotentialExtraTopic(topic) {
return
}
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
// commit loop, triggered by the mainLoop
func (c *Consumer) cmLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
c.handleError(&Error{Ctx: "commit", error: err})
return
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
func (c *Consumer) rebalanceError(err error, n *Notification) {
if n != nil {
n.Type = RebalanceError
c.handleNotification(n)
}
switch err {
case sarama.ErrRebalanceInProgress:
default:
c.handleError(&Error{Ctx: "rebalance", error: err})
}
select {
case <-c.dying:
case <-time.After(c.client.config.Metadata.Retry.Backoff):
}
}
func (c *Consumer) handleNotification(n *Notification) {
if c.client.config.Group.Return.Notifications {
select {
case c.notifications <- n:
case <-c.dying:
return
}
}
}
func (c *Consumer) handleError(e *Error) {
if c.client.config.Consumer.Return.Errors {
select {
case c.errors <- e:
case <-c.dying:
return
}
} else {
sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
}
}
// Releases the consumer and commits offsets, called from rebalance() and Close()
func (c *Consumer) release() (err error) {
// Stop all consumers
c.subs.Stop()
// Clear subscriptions on exit
defer c.subs.Clear()
// Wait for messages to be processed
timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
defer timeout.Stop()
select {
case <-c.dying:
case <-timeout.C:
}
// Commit offsets, continue on errors
if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
err = e
}
return
}
// --------------------------------------------------------------------
// Performs a heartbeat, part of the mainLoop()
func (c *Consumer) heartbeat() error {
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
memberID, generationID := c.membership()
resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
GroupId: c.groupID,
MemberId: memberID,
GenerationId: generationID,
})
if err != nil {
c.closeCoordinator(broker, err)
return err
}
return resp.Err
}
// Performs a rebalance, part of the mainLoop()
func (c *Consumer) rebalance() (map[string][]int32, error) {
memberID, _ := c.membership()
sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
allTopics, err := c.client.Topics()
if err != nil {
return nil, err
}
c.extraTopics = c.selectExtraTopics(allTopics)
sort.Strings(c.extraTopics)
// Re-join consumer group
strategy, err := c.joinGroup()
switch {
case err == sarama.ErrUnknownMemberId:
c.membershipMu.Lock()
c.memberID = ""
c.membershipMu.Unlock()
return nil, err
case err != nil:
return nil, err
}
// Sync consumer group state, fetch subscriptions
subs, err := c.syncGroup(strategy)
switch {
case err == sarama.ErrRebalanceInProgress:
return nil, err
case err != nil:
_ = c.leaveGroup()
return nil, err
}
return subs, nil
}
// Performs the subscription, part of the mainLoop()
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
// fetch offsets
offsets, err := c.fetchOffsets(subs)
if err != nil {
_ = c.leaveGroup()
return err
}
// create consumers in parallel
var mu sync.Mutex
var wg sync.WaitGroup
for topic, partitions := range subs {
for _, partition := range partitions {
wg.Add(1)
info := offsets[topic][partition]
if item, ok := c.customOffsets[topic]; ok {
if i, ok := item[partition]; ok {
info = i
}
}
go func(topic string, partition int32) {
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
mu.Lock()
err = e
mu.Unlock()
}
wg.Done()
}(topic, partition)
}
}
wg.Wait()
if err != nil {
_ = c.release()
_ = c.leaveGroup()
}
return err
}
// --------------------------------------------------------------------
// Send a request to the broker to join group on rebalance()
func (c *Consumer) joinGroup() (*balancer, error) {
memberID, _ := c.membership()
req := &sarama.JoinGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
ProtocolType: "consumer",
}
meta := &sarama.ConsumerGroupMemberMetadata{
Version: 1,
Topics: append(c.coreTopics, c.extraTopics...),
UserData: c.client.config.Group.Member.UserData,
}
err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
if err != nil {
return nil, err
}
err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
if err != nil {
return nil, err
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.JoinGroup(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
} else if resp.Err != sarama.ErrNoError {
c.closeCoordinator(broker, resp.Err)
return nil, resp.Err
}
var strategy *balancer
if resp.LeaderId == resp.MemberId {
members, err := resp.GetMembers()
if err != nil {
return nil, err
}
strategy, err = newBalancerFromMeta(c.client, members)
if err != nil {
return nil, err
}
}
c.membershipMu.Lock()
c.memberID = resp.MemberId
c.generationID = resp.GenerationId
c.membershipMu.Unlock()
return strategy, nil
}
// Send a request to the broker to sync the group on rebalance().
// Returns a list of topics and partitions to consume.
func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
memberID, generationID := c.membership()
req := &sarama.SyncGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
GenerationId: generationID,
}
if strategy != nil {
for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
Topics: topics,
}); err != nil {
return nil, err
}
}
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.SyncGroup(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
} else if resp.Err != sarama.ErrNoError {
c.closeCoordinator(broker, resp.Err)
return nil, resp.Err
}
// Return if there is nothing to subscribe to
if len(resp.MemberAssignment) == 0 {
return nil, nil
}
// Get assigned subscriptions
members, err := resp.GetMemberAssignment()
if err != nil {
return nil, err
}
// Sort partitions, for each topic
for topic := range members.Topics {
sort.Sort(int32Slice(members.Topics[topic]))
}
return members.Topics, nil
}
// Fetches latest committed offsets for all subscriptions
func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
offsets := make(map[string]map[int32]offsetInfo, len(subs))
req := &sarama.OffsetFetchRequest{
Version: 1,
ConsumerGroup: c.groupID,
}
for topic, partitions := range subs {
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
for _, partition := range partitions {
offsets[topic][partition] = offsetInfo{Offset: -1}
req.AddPartition(topic, partition)
}
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.FetchOffset(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
for topic, partitions := range subs {
for _, partition := range partitions {
block := resp.GetBlock(topic, partition)
if block == nil {
return nil, sarama.ErrIncompleteResponse
}
if block.Err == sarama.ErrNoError {
offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
} else {
return nil, block.Err
}
}
}
return offsets, nil
}
// Send a request to the broker to leave the group on failes rebalance() and on Close()
func (c *Consumer) leaveGroup() error {
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
memberID, _ := c.membership()
if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
}); err != nil {
c.closeCoordinator(broker, err)
}
return err
}
// --------------------------------------------------------------------
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
memberID, _ := c.membership()
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
// Create partitionConsumer
pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
if err != nil {
return err
}
// Store in subscriptions
c.subs.Store(topic, partition, pc)
// Start partition consumer goroutine
tomb.Go(func(stopper <-chan none) {
if c.client.config.Group.Mode == ConsumerModePartitions {
pc.waitFor(stopper)
} else {
pc.multiplex(stopper, c.messages, c.errors)
}
})
if c.client.config.Group.Mode == ConsumerModePartitions {
c.partitions <- pc
}
return nil
}
func (c *Consumer) commitOffsetsWithRetry(retries int) error {
err := c.CommitOffsets()
if err != nil && retries > 0 {
return c.commitOffsetsWithRetry(retries - 1)
}
return err
}
func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
if broker != nil {
_ = broker.Close()
}
switch err {
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
_ = c.client.RefreshCoordinator(c.groupID)
}
}
func (c *Consumer) selectExtraTopics(allTopics []string) []string {
extra := allTopics[:0]
for _, topic := range allTopics {
if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
extra = append(extra, topic)
}
}
return extra
}
func (c *Consumer) isKnownCoreTopic(topic string) bool {
pos := sort.SearchStrings(c.coreTopics, topic)
return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
}
func (c *Consumer) isKnownExtraTopic(topic string) bool {
pos := sort.SearchStrings(c.extraTopics, topic)
return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
}
func (c *Consumer) isPotentialExtraTopic(topic string) bool {
rx := c.client.config.Group.Topics
if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
return false
}
if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
return true
}
return false
}
func (c *Consumer) refreshCoordinator() error {
if err := c.refreshMetadata(); err != nil {
return err
}
return c.client.RefreshCoordinator(c.groupID)
}
func (c *Consumer) refreshMetadata() (err error) {
if c.client.config.Metadata.Full {
err = c.client.RefreshMetadata()
} else {
var topics []string
if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
err = c.client.RefreshMetadata(topics...)
}
}
// maybe we didn't have authorization to describe all topics
switch err {
case sarama.ErrTopicAuthorizationFailed:
err = c.client.RefreshMetadata(c.coreTopics...)
}
return
}
func (c *Consumer) membership() (memberID string, generationID int32) {
c.membershipMu.RLock()
memberID, generationID = c.memberID, c.generationID
c.membershipMu.RUnlock()
return
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/johng/gf.git
git@gitee.com:johng/gf.git
johng
gf
gf
v1.0.898

搜索帮助

0d507c66 1850385 C8b1a773 1850385