1 Star 0 Fork 0

Kenvins/librdkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rdkafka_conf.h 23.09 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2014-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _RDKAFKA_CONF_H_
#define _RDKAFKA_CONF_H_
#include "rdlist.h"
#include "rdkafka_cert.h"
#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 && \
!defined(OPENSSL_IS_BORINGSSL)
#define WITH_SSL_ENGINE 1
/* Deprecated in OpenSSL 3 */
#include <openssl/engine.h>
#endif /* WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 */
/**
* Forward declarations
*/
struct rd_kafka_transport_s;
/**
* MessageSet compression codecs
*/
typedef enum {
RD_KAFKA_COMPRESSION_NONE,
RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
RD_KAFKA_COMPRESSION_ZSTD = RD_KAFKA_MSG_ATTR_ZSTD,
RD_KAFKA_COMPRESSION_INHERIT, /* Inherit setting from global conf */
RD_KAFKA_COMPRESSION_NUM
} rd_kafka_compression_t;
static RD_INLINE RD_UNUSED const char *
rd_kafka_compression2str(rd_kafka_compression_t compr) {
static const char *names[RD_KAFKA_COMPRESSION_NUM] = {
[RD_KAFKA_COMPRESSION_NONE] = "none",
[RD_KAFKA_COMPRESSION_GZIP] = "gzip",
[RD_KAFKA_COMPRESSION_SNAPPY] = "snappy",
[RD_KAFKA_COMPRESSION_LZ4] = "lz4",
[RD_KAFKA_COMPRESSION_ZSTD] = "zstd",
[RD_KAFKA_COMPRESSION_INHERIT] = "inherit"};
static RD_TLS char ret[32];
if ((int)compr < 0 || compr >= RD_KAFKA_COMPRESSION_NUM) {
rd_snprintf(ret, sizeof(ret), "codec0x%x?", (int)compr);
return ret;
}
return names[compr];
}
/**
* MessageSet compression levels
*/
typedef enum {
RD_KAFKA_COMPLEVEL_DEFAULT = -1,
RD_KAFKA_COMPLEVEL_MIN = -1,
RD_KAFKA_COMPLEVEL_GZIP_MAX = 9,
RD_KAFKA_COMPLEVEL_LZ4_MAX = 12,
RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0,
RD_KAFKA_COMPLEVEL_ZSTD_MAX = 22,
RD_KAFKA_COMPLEVEL_MAX = 12
} rd_kafka_complevel_t;
typedef enum {
RD_KAFKA_PROTO_PLAINTEXT,
RD_KAFKA_PROTO_SSL,
RD_KAFKA_PROTO_SASL_PLAINTEXT,
RD_KAFKA_PROTO_SASL_SSL,
RD_KAFKA_PROTO_NUM,
} rd_kafka_secproto_t;
typedef enum {
RD_KAFKA_CONFIGURED,
RD_KAFKA_LEARNED,
RD_KAFKA_INTERNAL,
RD_KAFKA_LOGICAL
} rd_kafka_confsource_t;
static RD_INLINE RD_UNUSED const char *
rd_kafka_confsource2str(rd_kafka_confsource_t source) {
static const char *names[] = {"configured", "learned", "internal",
"logical"};
return names[source];
}
typedef enum {
_RK_GLOBAL = 0x1,
_RK_PRODUCER = 0x2,
_RK_CONSUMER = 0x4,
_RK_TOPIC = 0x8,
_RK_CGRP = 0x10,
_RK_DEPRECATED = 0x20,
_RK_HIDDEN = 0x40,
_RK_HIGH = 0x80, /* High Importance */
_RK_MED = 0x100, /* Medium Importance */
_RK_EXPERIMENTAL = 0x200, /* Experimental (unsupported) property */
_RK_SENSITIVE = 0x400 /* The configuration property's value
* might contain sensitive information. */
} rd_kafka_conf_scope_t;
/**< While the client groups is a generic concept, it is currently
* only implemented for consumers in librdkafka. */
#define _RK_CGRP _RK_CONSUMER
typedef enum {
_RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
_RK_CONF_PROP_SET_ADD, /* Add value (S2F) */
_RK_CONF_PROP_SET_DEL /* Remove value (S2F) */
} rd_kafka_conf_set_mode_t;
typedef enum {
RD_KAFKA_OFFSET_METHOD_NONE,
RD_KAFKA_OFFSET_METHOD_FILE,
RD_KAFKA_OFFSET_METHOD_BROKER
} rd_kafka_offset_method_t;
typedef enum {
RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC
} rd_kafka_oauthbearer_method_t;
typedef enum {
RD_KAFKA_SSL_ENDPOINT_ID_NONE,
RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
} rd_kafka_ssl_endpoint_id_t;
typedef enum {
RD_KAFKA_USE_ALL_DNS_IPS,
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
} rd_kafka_client_dns_lookup_t;
typedef enum {
RD_KAFKA_GROUP_PROTOCOL_GENERIC,
RD_KAFKA_GROUP_PROTOCOL_CONSUMER,
} rd_kafka_group_protocol_t;
/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
/**
* @struct rd_kafka_anyconf_t
* @brief The anyconf header must be the first field in the
* rd_kafka_conf_t and rd_kafka_topic_conf_t structs.
* It provides a way to track which property has been modified.
*/
struct rd_kafka_anyconf_hdr {
uint64_t modified[RD_KAFKA_CONF_PROPS_IDX_MAX / 64];
};
/**
* Optional configuration struct passed to rd_kafka_new*().
*
* The struct is populated ted through string properties
* by calling rd_kafka_conf_set().
*
*/
struct rd_kafka_conf_s {
struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
/*
* Generic configuration
*/
int enabled_events;
int max_msg_size;
int msg_copy_max_size;
int recv_max_msg_size;
int max_inflight;
int metadata_request_timeout_ms;
int metadata_refresh_interval_ms;
int metadata_refresh_fast_cnt;
int metadata_refresh_fast_interval_ms;
int metadata_refresh_sparse;
int metadata_max_age_ms;
int metadata_propagation_max_ms;
int debug;
int broker_addr_ttl;
int broker_addr_family;
int socket_timeout_ms;
int socket_blocking_max_ms;
int socket_sndbuf_size;
int socket_rcvbuf_size;
int socket_keepalive;
int socket_nagle_disable;
int socket_max_fails;
char *client_id_str;
char *brokerlist;
int stats_interval_ms;
int term_sig;
int reconnect_backoff_ms;
int reconnect_backoff_max_ms;
int reconnect_jitter_ms;
int socket_connection_setup_timeout_ms;
int connections_max_idle_ms;
int sparse_connections;
int sparse_connect_intvl;
int api_version_request;
int api_version_request_timeout_ms;
int api_version_fallback_ms;
char *broker_version_fallback;
rd_kafka_secproto_t security_protocol;
rd_kafka_client_dns_lookup_t client_dns_lookup;
struct {
#if WITH_SSL
SSL_CTX *ctx;
#endif
char *cipher_suites;
char *curves_list;
char *sigalgs_list;
char *key_location;
char *key_pem;
rd_kafka_cert_t *key;
char *key_password;
char *cert_location;
char *cert_pem;
rd_kafka_cert_t *cert;
char *ca_location;
char *ca_pem;
rd_kafka_cert_t *ca;
/** CSV list of Windows certificate stores */
char *ca_cert_stores;
char *crl_location;
#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000
ENGINE *engine;
#endif
char *engine_location;
char *engine_id;
void *engine_callback_data;
char *providers;
rd_list_t loaded_providers; /**< (SSL_PROVIDER*) */
char *keystore_location;
char *keystore_password;
int endpoint_identification;
int enable_verify;
int (*cert_verify_cb)(rd_kafka_t *rk,
const char *broker_name,
int32_t broker_id,
int *x509_error,
int depth,
const char *buf,
size_t size,
char *errstr,
size_t errstr_size,
void *opaque);
} ssl;
struct {
const struct rd_kafka_sasl_provider *provider;
char *principal;
char *mechanisms;
char *service_name;
char *kinit_cmd;
char *keytab;
int relogin_min_time;
/** Protects .username and .password access after client
* instance has been created (see sasl_set_credentials()). */
mtx_t lock;
char *username;
char *password;
#if WITH_SASL_SCRAM
/* SCRAM EVP-wrapped hash function
* (return value from EVP_shaX()) */
const void /*EVP_MD*/ *scram_evp;
/* SCRAM direct hash function (e.g., SHA256()) */
unsigned char *(*scram_H)(const unsigned char *d,
size_t n,
unsigned char *md);
/* Hash size */
size_t scram_H_size;
#endif
char *oauthbearer_config;
int enable_oauthbearer_unsecure_jwt;
int enable_callback_queue;
struct {
rd_kafka_oauthbearer_method_t method;
char *token_endpoint_url;
char *client_id;
char *client_secret;
char *scope;
char *extensions_str;
/* SASL/OAUTHBEARER token refresh event callback */
void (*token_refresh_cb)(rd_kafka_t *rk,
const char *oauthbearer_config,
void *opaque);
} oauthbearer;
} sasl;
char *plugin_paths;
#if WITH_PLUGINS
rd_list_t plugins;
#endif
/* Interceptors */
struct {
/* rd_kafka_interceptor_method_t lists */
rd_list_t on_conf_set; /* on_conf_set interceptors
* (not copied on conf_dup()) */
rd_list_t on_conf_dup; /* .. (not copied) */
rd_list_t on_conf_destroy; /* .. (not copied) */
rd_list_t on_new; /* .. (copied) */
rd_list_t on_destroy; /* .. (copied) */
rd_list_t on_send; /* .. (copied) */
rd_list_t on_acknowledgement; /* .. (copied) */
rd_list_t on_consume; /* .. (copied) */
rd_list_t on_commit; /* .. (copied) */
rd_list_t on_request_sent; /* .. (copied) */
rd_list_t on_response_received; /* .. (copied) */
rd_list_t on_thread_start; /* .. (copied) */
rd_list_t on_thread_exit; /* .. (copied) */
rd_list_t on_broker_state_change; /* .. (copied) */
/* rd_strtup_t list */
rd_list_t config; /* Configuration name=val's
* handled by interceptors. */
} interceptors;
/* Client group configuration */
int coord_query_intvl_ms;
int max_poll_interval_ms;
int builtin_features;
/*
* Consumer configuration
*/
int check_crcs;
int queued_min_msgs;
int queued_max_msg_kbytes;
int64_t queued_max_msg_bytes;
int fetch_wait_max_ms;
int fetch_msg_max_bytes;
int fetch_max_bytes;
int fetch_min_bytes;
int fetch_queue_backoff_ms;
int fetch_error_backoff_ms;
rd_kafka_group_protocol_t group_protocol;
char *group_id_str;
char *group_instance_id;
char *group_remote_assignor;
int allow_auto_create_topics;
rd_kafka_pattern_list_t *topic_blacklist;
struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
* for automatically
* subscribed topics. */
int enable_auto_commit;
int enable_auto_offset_store;
int auto_commit_interval_ms;
int group_session_timeout_ms;
int group_heartbeat_intvl_ms;
rd_kafkap_str_t *group_protocol_type;
char *partition_assignment_strategy;
rd_list_t partition_assignors;
int enabled_assignor_cnt;
void (*rebalance_cb)(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque);
void (*offset_commit_cb)(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque);
rd_kafka_offset_method_t offset_store_method;
rd_kafka_isolation_level_t isolation_level;
int enable_partition_eof;
rd_kafkap_str_t *client_rack;
/*
* Producer configuration
*/
struct {
/*
* Idempotence
*/
int idempotence; /**< Enable Idempotent Producer */
rd_bool_t gapless; /**< Raise fatal error if
* gapless guarantee can't be
* satisfied. */
/*
* Transactions
*/
char *transactional_id; /**< Transactional Id */
int transaction_timeout_ms; /**< Transaction timeout */
} eos;
int queue_buffering_max_msgs;
int queue_buffering_max_kbytes;
double buffering_max_ms_dbl; /**< This is the configured value */
rd_ts_t buffering_max_us; /**< This is the value used in the code */
int queue_backpressure_thres;
int max_retries;
int retry_backoff_ms;
int retry_backoff_max_ms;
int batch_num_messages;
int batch_size;
rd_kafka_compression_t compression_codec;
int dr_err_only;
int sticky_partition_linger_ms;
/* Message delivery report callback.
* Called once for each produced message, either on
* successful and acknowledged delivery to the broker in which
* case 'err' is 0, or if the message could not be delivered
* in which case 'err' is non-zero (use rd_kafka_err2str()
* to obtain a human-readable error reason).
*
* If the message was produced with neither RD_KAFKA_MSG_F_FREE
* or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
* pointer provided to rd_kafka_produce().
* rdkafka will not perform any further actions on 'payload'
* at this point and the application may rd_free the payload data
* at this point.
*
* 'opaque' is 'conf.opaque', while 'msg_opaque' is
* the opaque pointer provided in the rd_kafka_produce() call.
*/
void (*dr_cb)(rd_kafka_t *rk,
void *payload,
size_t len,
rd_kafka_resp_err_t err,
void *opaque,
void *msg_opaque);
void (*dr_msg_cb)(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage,
void *opaque);
/* Consume callback */
void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque);
/* Log callback */
void (*log_cb)(const rd_kafka_t *rk,
int level,
const char *fac,
const char *buf);
int log_level;
int log_queue;
int log_thread_name;
int log_connection_close;
/* PRNG seeding */
int enable_random_seed;
/* Error callback */
void (*error_cb)(rd_kafka_t *rk,
int err,
const char *reason,
void *opaque);
/* Throttle callback */
void (*throttle_cb)(rd_kafka_t *rk,
const char *broker_name,
int32_t broker_id,
int throttle_time_ms,
void *opaque);
/* Stats callback */
int (*stats_cb)(rd_kafka_t *rk,
char *json,
size_t json_len,
void *opaque);
/* Socket creation callback */
int (*socket_cb)(int domain, int type, int protocol, void *opaque);
/* Connect callback */
int (*connect_cb)(int sockfd,
const struct sockaddr *addr,
int addrlen,
const char *id,
void *opaque);
/* Close socket callback */
int (*closesocket_cb)(int sockfd, void *opaque);
/* File open callback */
int (*open_cb)(const char *pathname,
int flags,
mode_t mode,
void *opaque);
/* Address resolution callback */
int (*resolve_cb)(const char *node,
const char *service,
const struct addrinfo *hints,
struct addrinfo **res,
void *opaque);
/* Background queue event callback */
void (*background_event_cb)(rd_kafka_t *rk,
rd_kafka_event_t *rkev,
void *opaque);
/* Opaque passed to callbacks. */
void *opaque;
/* For use with value-less properties. */
int dummy;
/* Admin client defaults */
struct {
int request_timeout_ms; /* AdminOptions.request_timeout */
} admin;
/*
* Test mocks
*/
struct {
int broker_cnt; /**< Number of mock brokers */
int broker_rtt; /**< Broker RTT */
} mock;
/*
* Unit test pluggable interfaces
*/
struct {
/**< Inject errors in ProduceResponse handler */
rd_kafka_resp_err_t (*handle_ProduceResponse)(
rd_kafka_t *rk,
int32_t brokerid,
uint64_t msgid,
rd_kafka_resp_err_t err);
} ut;
char *sw_name; /**< Software/client name */
char *sw_version; /**< Software/client version */
struct {
/** Properties on (implicit pass-thru) default_topic_conf were
* overwritten by passing an explicit default_topic_conf. */
rd_bool_t default_topic_conf_overwritten;
} warn;
};
int rd_kafka_socket_cb_linux(int domain, int type, int protocol, void *opaque);
int rd_kafka_socket_cb_generic(int domain,
int type,
int protocol,
void *opaque);
#ifndef _WIN32
int rd_kafka_open_cb_linux(const char *pathname,
int flags,
mode_t mode,
void *opaque);
#endif
int rd_kafka_open_cb_generic(const char *pathname,
int flags,
mode_t mode,
void *opaque);
struct rd_kafka_topic_conf_s {
struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
int required_acks;
int32_t request_timeout_ms;
int message_timeout_ms;
int32_t (*partitioner)(const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);
char *partitioner_str;
rd_bool_t random_partitioner; /**< rd_true - random
* rd_false - sticky */
int queuing_strategy; /* RD_KAFKA_QUEUE_FIFO|LIFO */
int (*msg_order_cmp)(const void *a, const void *b);
rd_kafka_compression_t compression_codec;
rd_kafka_complevel_t compression_level;
int produce_offset_report;
int consume_callback_max_msgs;
int auto_commit;
int auto_commit_interval_ms;
int auto_offset_reset;
char *offset_store_path;
int offset_store_sync_interval_ms;
rd_kafka_offset_method_t offset_store_method;
/* Application provided opaque pointer (this is rkt_opaque) */
void *opaque;
};
char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp);
void rd_kafka_anyconf_destroy(int scope, void *conf);
rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,
const char *name);
void rd_kafka_desensitize_str(char *str);
void rd_kafka_conf_desensitize(rd_kafka_conf_t *conf);
void rd_kafka_topic_conf_desensitize(rd_kafka_topic_conf_t *tconf);
const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
rd_kafka_conf_t *conf);
const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
const rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf);
int rd_kafka_conf_warn(rd_kafka_t *rk);
void rd_kafka_anyconf_dump_dbg(rd_kafka_t *rk,
int scope,
const void *conf,
const char *description);
#include "rdkafka_confval.h"
int unittest_conf(void);
#endif /* _RDKAFKA_CONF_H_ */
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kenvins/librdkafka.git
git@gitee.com:kenvins/librdkafka.git
kenvins
librdkafka
librdkafka
master

搜索帮助