1 Star 0 Fork 0

Kenvins/librdkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rdkafka_cgrp.h 17.49 KB
一键复制 编辑 原始数据 按行查看 历史
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _RDKAFKA_CGRP_H_
#define _RDKAFKA_CGRP_H_
#include "rdinterval.h"
#include "rdkafka_assignor.h"
/**
* Client groups implementation
*
* Client groups handling for a single cgrp is assigned to a single
* rd_kafka_broker_t object at any given time.
* The main thread will call cgrp_serve() to serve its cgrps.
*
* This means that the cgrp itself does not need to be locked since it
* is only ever used from the main thread.
*
*/
extern const char *rd_kafka_cgrp_join_state_names[];
/**
* Client group
*/
typedef struct rd_kafka_cgrp_s {
const rd_kafkap_str_t *rkcg_group_id;
rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */
rd_kafkap_str_t *rkcg_group_instance_id;
const rd_kafkap_str_t *rkcg_client_id;
enum {
/* Init state */
RD_KAFKA_CGRP_STATE_INIT,
/* Cgrp has been stopped. This is a final state */
RD_KAFKA_CGRP_STATE_TERM,
/* Query for group coordinator */
RD_KAFKA_CGRP_STATE_QUERY_COORD,
/* Outstanding query, awaiting response */
RD_KAFKA_CGRP_STATE_WAIT_COORD,
/* Wait ack from assigned cgrp manager broker thread */
RD_KAFKA_CGRP_STATE_WAIT_BROKER,
/* Wait for manager broker thread to connect to broker */
RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
/* Coordinator is up and manager is assigned. */
RD_KAFKA_CGRP_STATE_UP,
} rkcg_state;
rd_ts_t rkcg_ts_statechange; /* Timestamp of last
* state change. */
enum {
/* all: join or rejoin, possibly with an existing assignment. */
RD_KAFKA_CGRP_JOIN_STATE_INIT,
/* all: JoinGroupRequest sent, awaiting response. */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
/* all: MetadataRequest sent, awaiting response.
* While metadata requests may be issued at any time,
* this state is only set upon a proper (re)join. */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
/* Follower: SyncGroupRequest sent, awaiting response. */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
/* all: waiting for application to call *_assign() */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL,
/* all: waiting for application to call *_unassign() */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL,
/* all: waiting for full assignment to decommission */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE,
/* all: waiting for partial assignment to decommission */
RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE,
/* all: synchronized and assigned
* may be an empty assignment. */
RD_KAFKA_CGRP_JOIN_STATE_STEADY,
} rkcg_join_state;
/* State when group leader */
struct {
rd_kafka_group_member_t *members;
int member_cnt;
} rkcg_group_leader;
rd_kafka_q_t *rkcg_q; /* Application poll queue */
rd_kafka_q_t *rkcg_ops; /* Manager ops queue */
rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */
int rkcg_flags;
#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */
#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE \
0x8 /* Send LeaveGroup when \
* unassign is done */
#define RD_KAFKA_CGRP_F_SUBSCRIPTION \
0x10 /* If set: \
* subscription \
* else: \
* static assignment */
#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT \
0x20 /* A Heartbeat request \
* is in transit, dont \
* send a new one. */
#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION \
0x40 /* Subscription contains \
* wildcards. */
#define RD_KAFKA_CGRP_F_WAIT_LEAVE \
0x80 /* Wait for LeaveGroup \
* to be sent. \
* This is used to stall \
* termination until \
* the LeaveGroupRequest \
* is responded to, \
* otherwise it risks \
* being dropped in the \
* output queue when \
* the broker is destroyed. \
*/
#define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED \
0x100 /**< max.poll.interval.ms \
* was exceeded and we \
* left the group. \
* Do not rejoin until \
* the application has \
* polled again. */
rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/
rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */
rd_interval_t rkcg_join_intvl; /* JoinGroup interval */
rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */
rd_ts_t rkcg_ts_session_timeout; /**< Absolute session
* timeout enforced by
* the consumer, this
* value is updated on
* Heartbeat success,
* etc. */
rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error,
* used for logging. */
TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics; /* Topics subscribed to */
rd_list_t rkcg_toppars; /* Toppars subscribed to*/
int32_t rkcg_generation_id; /* Current generation id */
rd_kafka_assignor_t *rkcg_assignor; /**< The current partition
* assignor. used by both
* leader and members. */
void *rkcg_assignor_state; /**< current partition
* assignor state */
int32_t rkcg_coord_id; /**< Current coordinator id,
* or -1 if not known. */
rd_kafka_group_protocol_t
rkcg_group_protocol; /**< Group protocol to use */
rd_kafkap_str_t *rkcg_group_remote_assignor; /**< Group remote
* assignor to use */
rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator
* broker handle, or NULL.
* rkcg_coord's nodename is
* updated to this broker's
* nodename when there is a
* coordinator change. */
rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator
* broker handle.
* Will be updated when the
* coordinator changes. */
int16_t rkcg_wait_resp; /**< Awaiting response for this
* ApiKey.
* Makes sure only one
* JoinGroup or SyncGroup
* request is outstanding.
* Unset value is -1. */
/** Current subscription */
rd_kafka_topic_partition_list_t *rkcg_subscription;
/** The actual topics subscribed (after metadata+wildcard matching).
* Sorted. */
rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
/** Subscribed topics that are errored/not available. */
rd_kafka_topic_partition_list_t *rkcg_errored_topics;
/** If a SUBSCRIBE op is received during a COOPERATIVE rebalance,
* actioning this will be postponed until after the rebalance
* completes. The waiting subscription is stored here.
* Mutually exclusive with rkcg_next_subscription. */
rd_kafka_topic_partition_list_t *rkcg_next_subscription;
/** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance,
* actioning this will be posponed until after the rebalance
* completes. This flag is used to signal a waiting unsubscribe
* operation. Mutually exclusive with rkcg_next_subscription. */
rd_bool_t rkcg_next_unsubscribe;
/** Assignment considered lost */
rd_atomic32_t rkcg_assignment_lost;
/** Current assignment of partitions from last SyncGroup response.
* NULL means no assignment, else empty or non-empty assignment.
*
* This group assignment is the actual set of partitions that were
* assigned to our consumer by the consumer group leader and should
* not be confused with the rk_consumer.assignment which is the
* partitions assigned by the application using assign(), et.al.
*
* The group assignment and the consumer assignment are typically
* identical, but not necessarily since an application is free to
* assign() any partition, not just the partitions it is handed
* through the rebalance callback.
*
* Yes, this nomenclature is ambigious but has historical reasons,
* so for now just try to remember that:
* - group assignment == consumer group assignment.
* - assignment == actual used assignment, i.e., fetched partitions.
*
* @remark This list is always sorted.
*/
rd_kafka_topic_partition_list_t *rkcg_group_assignment;
/** The partitions to incrementally assign following a
* currently in-progress incremental unassign. */
rd_kafka_topic_partition_list_t *rkcg_rebalance_incr_assignment;
/** Rejoin the group following a currently in-progress
* incremental unassign. */
rd_bool_t rkcg_rebalance_rejoin;
rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to
* application.
* This is for silencing
* same errors. */
rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */
rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max
* poll interval. */
rd_kafka_t *rkcg_rk;
rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op
* (OP_TERMINATE)
* to this rko's queue. */
rd_ts_t rkcg_ts_terminate; /* Timestamp of when
* cgrp termination was
* initiated. */
rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */
/* Protected by rd_kafka_*lock() */
struct {
rd_ts_t ts_rebalance; /* Timestamp of
* last rebalance */
int rebalance_cnt; /* Number of
rebalances */
char rebalance_reason[256]; /**< Last rebalance
* reason */
int assignment_size; /* Partition count
* of last rebalance
* assignment */
} rkcg_c;
} rd_kafka_cgrp_t;
/* Check if broker is the coordinator */
#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb) \
((rkcg)->rkcg_coord_id != -1 && \
(rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
/**
* @returns true if cgrp is using static group membership
*/
#define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \
!RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id)
extern const char *rd_kafka_cgrp_state_names[];
extern const char *rd_kafka_cgrp_join_state_names[];
void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg);
rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
rd_kafka_group_protocol_t group_protocol,
const rd_kafkap_str_t *group_id,
const rd_kafkap_str_t *client_id);
void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg);
void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq,
rd_kafka_op_type_t type,
rd_kafka_resp_err_t err);
void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del(rd_kafka_cgrp_t *rkcg,
const char *pattern);
rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add(rd_kafka_cgrp_t *rkcg,
const char *pattern);
int rd_kafka_cgrp_topic_check(rd_kafka_cgrp_t *rkcg, const char *topic);
void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id);
void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state);
rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg);
void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason);
void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg,
rd_kafka_resp_err_t err,
const char *reason);
void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
rd_bool_t do_join);
#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
void rd_kafka_cgrp_assigned_offsets_commit(
rd_kafka_cgrp_t *rkcg,
const rd_kafka_topic_partition_list_t *offsets,
rd_bool_t set_offsets,
const char *reason);
void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg);
rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg);
struct rd_kafka_consumer_group_metadata_s {
char *group_id;
int32_t generation_id;
char *member_id;
char *group_instance_id; /**< Optional (NULL) */
};
rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup(
const rd_kafka_consumer_group_metadata_t *cgmetadata);
static RD_UNUSED const char *
rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) {
switch (protocol) {
case RD_KAFKA_REBALANCE_PROTOCOL_EAGER:
return "EAGER";
case RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE:
return "COOPERATIVE";
default:
return "NONE";
}
}
#endif /* _RDKAFKA_CGRP_H_ */
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kenvins/librdkafka.git
git@gitee.com:kenvins/librdkafka.git
kenvins
librdkafka
librdkafka
master

搜索帮助