Generally speaking, The Modern C++ based Kafka API
is quite similar to the Kafka Java's API.
We'd recommend users to cross-reference them, --especially the examples.
Unlike Java's KafkaProducer, here we introduce two derived classes, -- KafkaSyncProducer
and KafkaAsyncProducer
--depending on different send
behaviors (synchronous/asynchronous).
send
is a blocking operation, and it will immediately get the RecordMetadata while the function returns. If anything wrong occurs, an exception would be thrown. // Create configuration object
kafka::Properties props({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
});
// Create a producer instance.
kafka::KafkaSyncProducer producer(props);
// Read messages from stdin and produce to the broker.
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
// Send the message.
try {
kafka::Producer::RecordMetadata metadata = producer.send(record);
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} catch (const kafka::KafkaException& e) {
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
}
if (line.empty()) break;
};
// producer.close(); // No explicit close is needed, RAII will take care of it
ProducerConfig::BOOTSTRAP_SERVERS
is mandatory for ProducerConfig.
ProducerRecord
would not take any ownership for the key
or value
. Thus, the user must guarantee the memory block (pointed by key
or value
) is valid until being send
.
Since send
is a blocking operation, the throughput will be highly impacted, but it is easier to make sure of the message delivery and logically it is simpler.
At the end, the user could call close
manually, or just leave it to the destructor (close
would be called anyway).
Async
(in the name) means send
is an unblocking operation, and the result (including errors) could only be got from the delivery callback. // Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
});
// Create a producer instance.
kafka::KafkaAsyncProducer producer(props);
// Read messages from stdin and produce to the broker.
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
// Send the message.
producer.send(record,
// The delivery report handler
[](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (!ec) {
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} else {
std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
}
},
// The memory block given by record.value() would be copied
kafka::KafkaProducer::SendOption::ToCopyRecordValue);
if (line.empty()) break;
}
Same with KafkaSyncProducer, the user must guarantee the memory block for ProducerRecord
's key
is valid until being send
.
By default, the memory block for ProducerRecord
's value
must be valid until the delivery callback is called; Otherwise, the send
should be with option KafkaProducer::SendOption::ToCopyRecordValue
.
It's guaranteed that the delivery callback would be triggered anyway after send
, -- a producer would even be waiting for it before close
. So, it's a good way to release these memory resources in the Producer::Callback
function.
KafkaClient::EventsPollingOption::Manual
While we construct a KafkaAsyncProducer
with option KafkaClient::EventsPollingOption::Auto
(default), an internal thread would be created for MessageDelivery
callbacks handling.
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the MessageDelivery
responses.
Here we have another choice, -- using KafkaClient::EventsPollingOption::Manual
, thus the MessageDelivery
callbacks would be called within member function pollEvents()
.
KafkaAsyncProducer
with EventsPollingOption::Manual
, the send()
would be an unblocked
operation.
I.e, once the message buffering queue
becomes full, the send()
operation would throw an exception (or return an error code
with the input reference parameter), -- instead of blocking there.
This makes sense, since you might want to call pollEvents()
later, thus delivery-callback could be called for some messages (which could then be removed from the message buffering queue
). kafak::KafkaAsyncProducer producer(props, KafkaClient::EventsPollingOption::Manual);
// Prepare "msgsToBeSent"
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
for (const auto& msg : msgsToBeSent) {
auto record = kafak::ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
std::error_code ec;
producer.send(ec,
record,
// Ack callback
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
// the message could be identified by `metadata.recordId()`
if (ec) {
LOG_ERROR("Cannot send out message with recordId={0}", metadata.recordId());
} else {
msgsToBeSend.erase(metadata.recordId()); // Quite safe here
}
});
if (ec) break;
}
// Here we call the `MessageDelivery` callbacks
// Note, we can only do this while the producer was constructed with `EventsPollingOption::MANUAL`.
producer.pollEvents();
A ProducerRecord
could take extra information with headers
.
header
within headers
contains the pointer of the memory block for its value
. The memory block MUST be valid until the ProducerRecord
is read by producer.send()
. kafak::KafkaAsyncProducer producer(props);
auto record = kafka::ProducerRecord(topic, partition, Key(), Value());
for (const auto& msg : msgsToBeSent) {
// Prepare record headers
std::string session = msg.session;
std::uint32_t seqno = msg.seqno;
record.headers() = {
{ "session", { session.c_str(), session.size()} },
{ "seqno", { &seqno, sizeof(seqno)} }
};
record.setKey(msg.key);
record.setValue(msg.value);
producer.send(record,
// Ack callback
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (ec) {
LOG_ERROR("Cannot send out message: {0}, err: {1}", metadata.toString(), ec);
}
});
}
Once an error occurs during send()
, KafkaSyncProducer
and KafkaAsyncProducer
behave differently.
KafkaSyncProducer
gets std::error_code
by catching exceptions (with error()
member function).
KafkaAsyncProducer
gets std::error_code
with delivery-callback (with a parameter of the callback function).
There are 2 kinds of possible errors,
Local errors,
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC -- The topic doesn't exist
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION -- The partition doesn't exist
RD_KAFKA_RESP_ERR__INVALID_ARG -- Invalid topic (topic is null or the length is too long (>512))
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT -- No ack received within the time limit
Broker errors,
topic creation
If the cluster's configuration is with auto.create.topics.enable=true
, the producer/consumer could trigger the brokers to create a new topic (with send
, subscribe
, etc)
Note, the default created topic may be not what you want (e.g, with default.replication.factor=1
configuration as default, etc), thus causing other unexpected problems.
ack
timeout?If an ack failed to be received within MESSAGE_TIMEOUT_MS
, an exception would be thrown for a KafkaSyncSend, or, an error code would be received by the delivery callback for a KafkaAsyncProducer.
Enlarging the default BATCH_NUM_MESSAGES
and LINGER_MS
might improve message batching, thus enhancing the throughput.
While, on the other hand, LINGER_MS
would highly impact the latency.
The QUEUE_BUFFERING_MAX_MESSAGES
and QUEUE_BUFFERING_MAX_KBYTES
would determine the max in flight requests (some materials about Kafka would call it in this way)
. If the queue buffer is full, the send
operation would be blocked.
Larger QUEUE_BUFFERING_MAX_MESSAGES
/QUEUE_BUFFERING_MAX_KBYTES
might help to improve throughput as well, while also means more messages locally buffering.
Quick Answer,
The Kafka cluster should be configured with min.insync.replicas = 2
at least
Use a KafkaSyncProducer
(with configuration {ProducerConfig::ACKS, "all"}
); or use a KafkaAsyncProducer
(with configuration {ProducerConfig::ENABLE_IDEMPOTENCE, "true"}
), together with proper error-handling within the delivery callbacks.
Complete Answer,
Excluding the user's main thread, KafkaSyncProducer would start another (N + 2) threads in the background, while KafkaAsyncProducer
would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
Most of these background threads are started internally by librdkafka.
Here is a brief introduction what they're used for,
Each broker (in the list of BOOTSTRAP_SERVERS) would take a separate thread to transmit messages towards a kafka cluster server.
Another 2 background threads would handle internal operations and kinds of timers, etc.
KafkaAsyncProducer
has one more background thread to keep polling the delivery callback event.
E.g, if a KafkaSyncProducer was created with property of BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890
, it would take 6 threads in total (including the main thread).
The Producer::Callback
is only available for a KafkaAsyncProducer
.
It will be handled by a background thread, not by the user's thread.
Note, should be careful if both the KafkaAsyncProducer::send()
and the Producer::Callback
might access the same container at the same time.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。