While using message dispatching systems, we always suffer from message lost, duplication and disordering.
Since the application (using the KafkaProducer
) might crash/restart, we might consider using certain mechanism to achieve At most once
/At least once
, and Ordering
, -- such as locally persisting the messages until successful delivery, using embedded sequence number to de-duplicate, or responding data-source to acknowledgement the delivery result, etc. These are common topics, which are not quite specific to Kafka.
Here we'd focus on KafkaProducer
, together with the idempotence
feature. Let's see, in which cases problems might happen, how to avoid them, and what's the best practise,-- to achieve No Message Lost
, Exactly Once
and Ordering
.
No Message Lost
partition leader
failed to sync it to other replicas
.First, the partition leader
doesn't sync-up the latest message to enough in-sync replicas
before responding with the ack
The partition leader
just don't need to wait for other replica
s response
acks=1
No available in-sync replica
to wait for the response
Then, the partition leader
crashes, and one in-sync replica
becomes new partition leader
partition leader
has no acknowledgement with the latest messages. Later, while new messages arrive, it would use conflicting record offsets (same with those records which the partition leader
knows only). Then, even if the previous partition leader
comes up again, these records have no chance to be recovered (just internally overwritten to be consistent with other replicas).No Message Lost
Make sure the leader would wait for responses from all in-sync replicas before the response
acks=all
is a MUST for producerEnsure enough In-Sync partition replicas
Configuration min.insync.replicas >= 2
is a MUST for brokers
Take min.insync.replicas = 2
for example, it means,
At most replication.factor - min.insync.replicas
replicas are out-of-sync, -- the producer would still be able to send messages, otherwise, it could fail with 'no enough replica' error, and keeps retrying.
Occasionally no more than min.insync.replicas
in-sync-replica failures. -- otherwise, messages might be missed. In this case, if just one in-sync replica crashes after sending back the ack to the producer, the message would not be lost; if two failed, it would! Since the new leader might be a replica which was not in-sync previously, and has no acknowledgement with these latest messages.
Please refer to Kafka Broker Configuration for more details.
Then, what would happen if replicas fail
Fails to send (not enough in-sync replica failure
), -- while number of in-sync replicas
could not meet min.insync.replication
Lost messages (after sending messages), -- with no in-sync replica
survived from multi-failures
No message lost (while with all in-sync replicas
acknowledged, and at least one in-sync replica
available)
Exactly Once
ack
to the producer. But for some abnormal reasons (such as network failure, etc), the producer might fail to receive the ack
. The librdkafka
's internal queue would retry, thus another (duplicated) message would be persisted by brokers.Exactly Once
enable.idempotence
configuration would resolve such a problem. And this configuration is RECOMMENDED for both KafkaSyncProducer
and KafkaAsyncProducer
, as long as it's possible.Ordering
Make sure these ProducerRecord
s be with the same partition
Explicitly assigned with the same topic-partition
Use the same key
for these records
The librdkafka
uses internal partition queues, and once a message fails to be sent successfully(e.g, brokers are down), it would be put back on the queue and retries again while retry.backoff.ms
expires. However, before that (retry with the failed message), the brokers might recover and the messages behind (if with configuration max.in.flight > 1
) happened to be sent successfully. In this case (with configuration max.in.flight > 1
and retries > 0
), disordering could happen, and the user would not even be aware of it.
Furthermore, while the last retry still failed, delivery callback would eventually be triggered. The user has to determine what to do for that (might want to re-send the message, etc). But there might be a case, -- some later messages had already been saved successfully by the server, thus no way to revert the disordering.
Idempotent producer
Please refer to the document from librdkafka, Idempotent Producer for more details.
The librdkafka
maintains the original produce() ordering per-partition for all messages produced, using an internal per-partition 64-bit counter called the msgid
which starts at 1. This msgid
allows messages to be re-inserted in the partition message queue in the original order in the case of retries.
The Idempotent Producer functionality in the Kafka protocol also has a per-message sequence number
, which is a signed 32-bit wrapping counter that is reset each time the Producer's ID (PID)
or Epoch
changes.
The msgid
is used, (along with a base msgid
value stored at the time the PID/Epoch
was bumped), to calculate the Kafka protocol's message sequence number
.
Since the following configuration properties are adjusted automatically (if not modified by the user). Producer instantiation will fail if user-supplied configuration is incompatible.
acks = all
max.in.flight (i.e,
max.in.flight.requests.per.connection) = 5
retries = INT32_MAX
Exception thrown during send
message.max.bytes
).Permanent errors (respond from brokers)
Typical errors are:
Invalid message: RD_KAFKA_RESP_ERR_CORRUPT_MESSAGE, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS, RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT, RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE.
Topic/Partition not exist: ERR_UNKNOWN_TOPIC_OR_PART, -- automatic topic creation is disabled on the broker or the application is specifying a partition that does not exist.
Authorization failure: ERR_TOPIC_AUTHORIZATION_FAILED, ERR_CLUSTER_AUTHORIZATION_FAILED
Normally, Permanent error
means careless design, or wrong configuration, which should be avoided from the very beginning.
Unless with enable.gapless.guarantee
(EXPERIMENTAL) configured, producer would keep going with the following messages; otherwise, it would purge all messages in-flight/in-queue (with RD_KAFKA_RESP_ERR__PURGE_INFLIGHT/RD_KAFKA_RESP_ERR__PURGE_QUEUE).
Temporary errors
permanent errors
, most of the left are temporary errors, which will be retried (if retry count permits); and while message.timeout
expired, message delivery callback would be triggered with RD_KAFKA_RESP_ERR__TIEMD_OUT
.Be careful with the RD_KAFKA_RESP_ERR__TIEMD_OUT
failure
There's some corner cases, such as a message that has been persisted by brokers but KafkaProducer
failed to get the response. If message.timeout.ms
has not expired, the producer could retry and eventually get the response. Otherwise, (i.e, message.timeout.ms
expired before the producer receives the successful ack
), it would be considered as a delivery failure by the producer (while the brokers wouldn't). Users might re-transmit the message thus causing duplications.
To avoid this tricky situation, a longer message.timeout.ms
is RECOMMENDED, to make sure there's enough time for transmission retries / on-flight responses.
max.in.flight=5
limitation. Currently, max.in.flight
means max.in.flight.per.connection
, -- that's 5 message batches (with size of ~1MB at the most) in flight (not get the ack
response yet) at the most, towards per broker. Within low-latency networks, it would not be a problem; while in other cases, it might be! Good news is, there might be a plan (in librdkafka
) to improve that per.connection
limit to per.partition
, thus boost the performance a lot.KafkaProducer
Enable enable.idempotence
configuration
Use a long message.timeout.ms
, which would let librdkafka
keep retrying, before triggering the delivery failure callback.
KafkaSyncProducer
demo int ret = 0;
std::atomic<bool> running = true;
KafkaSyncProducer producer(
Properties({
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "60000"}
})
);
while (running) {
const auto& msg = topMsgOfUpstream();
try {
auto record = ProducerRecord(topic, msg.key, msg.value);
producer.send(record);
popMsgFromUpstream();
} catch (const KafkaException& e) {
std::cerr << "Failed to send message! Reason: " << e.what() << std::endl;
ret = e.error();
break;
}
}
producer.close();
return ret;
It's easy to use KafkaSyncProducer
, since it sends messages one by one.
The throughput performance would not be that good, since there's only 1 message (embedded in 1 message batch) on the flight.
KafkaAsyncProducer
demo std::atomic<bool> running = true;
KafkaAsyncProducer producer(
Properties({
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "86400000"} // as long as 1 day
})
);
while (running) {
auto msg = fetchMsgFromUpstream();
auto record = ProducerRecord(topic, msg.key, msg.value, msg.id);
producer.send(record,
// Ack callback
[&msg](const Producer::RecordMetadata& metadata, std::error_code ec) {
// the message could be identified by `metadata.recordId()`
auto recordId = metadata.recordId();
if (ec) {
std::cerr << "Cannot send out message with recordId: " << recordId << ", error:" << ec.message() << std::endl;
} else {
commitMsgToUpstream(recordId);
}
});
}
producer.close();
message.timeout.ms
, we're not likely to catch an error with delivery callback, --it would retry for temporary errors anyway. But be aware with permanent errors, it might be caused by careless design.此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。