代码拉取完成,页面将自动刷新
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
#include "rdkafka.h"
#include "../src/rdkafka_proto.h"
#include "../src/rdstring.h"
#include "../src/rdunittest.h"
#include <stdarg.h>
/**
* @name Producer transaction tests using the mock cluster
*
*/
static int allowed_error;
/**
* @brief Decide what error_cb's will cause the test to fail.
*/
static int
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
if (err == allowed_error ||
/* If transport errors are allowed then it is likely
* that we'll also see ALL_BROKERS_DOWN. */
(allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
TEST_SAY("Ignoring allowed error: %s: %s\n",
rd_kafka_err2name(err), reason);
return 0;
}
return 1;
}
static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque);
/**
* @brief Simple on_response_received interceptor that simply calls the
* sub-test's on_response_received_cb function, if set.
*/
static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
TEST_ASSERT(on_response_received_cb != NULL, "");
return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey,
ApiVersion, CorrId, size, rtt, err,
ic_opaque);
}
/**
* @brief on_new interceptor to add an on_response_received interceptor.
*/
static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
const rd_kafka_conf_t *conf,
void *ic_opaque,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
if (on_response_received_cb)
err = rd_kafka_interceptor_add_on_response_received(
rk, "on_response_received", on_response_received_trampoline,
ic_opaque);
return err;
}
/**
* @brief Create a transactional producer and a mock cluster.
*
* The var-arg list is a NULL-terminated list of
* (const char *key, const char *value) config properties.
*
* Special keys:
* "on_response_received", "" - enable the on_response_received_cb
* interceptor,
* which must be assigned prior to
* calling create_tnx_producer().
*/
static RD_SENTINEL rd_kafka_t *
create_txn_producer(rd_kafka_mock_cluster_t **mclusterp,
const char *transactional_id,
int broker_cnt,
...) {
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
char numstr[8];
va_list ap;
const char *key;
rd_bool_t add_interceptors = rd_false;
rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "transactional.id", transactional_id);
/* When mock brokers are set to down state they're still binding
* the port, just not listening to it, which makes connection attempts
* stall until socket.connection.setup.timeout.ms expires.
* To speed up detection of brokers being down we reduce this timeout
* to just a couple of seconds. */
test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000");
/* Speed up reconnects */
test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
test_conf_set(conf, "test.mock.num.brokers", numstr);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
test_curr->ignore_dr_err = rd_false;
va_start(ap, broker_cnt);
while ((key = va_arg(ap, const char *))) {
if (!strcmp(key, "on_response_received")) {
add_interceptors = rd_true;
(void)va_arg(ap, const char *);
} else {
test_conf_set(conf, key, va_arg(ap, const char *));
}
}
va_end(ap);
/* Add an on_.. interceptors */
if (add_interceptors)
rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
on_new_producer, NULL);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
if (mclusterp) {
*mclusterp = rd_kafka_handle_mock_cluster(rk);
TEST_ASSERT(*mclusterp, "failed to create mock cluster");
/* Create some of the common consumer "input" topics
* that we must be able to commit to with
* send_offsets_to_transaction().
* The number depicts the number of partitions in the topic. */
TEST_CALL_ERR__(
rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1));
TEST_CALL_ERR__(rd_kafka_mock_topic_create(
*mclusterp, "srctopic64", 64, 1));
}
return rk;
}
/**
* @brief Test recoverable errors using mock broker error injections
* and code coverage checks.
*/
static void do_test_txn_recoverable_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *groupid = "myGroupId";
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
/* Make sure transaction and group coordinators are different.
* This verifies that AddOffsetsToTxnRequest isn't sent to the
* transaction coordinator but the group coordinator. */
rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
/*
* Inject som InitProducerId errors that causes retries
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_InitProducerId, 3,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
(void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
(void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
rd_kafka_flush(rk, -1);
/*
* Produce a message, let it fail with a non-idempo/non-txn
* retryable error
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Make sure messages are produced */
rd_kafka_flush(rk, -1);
/*
* Send some arbitrary offsets, first with some failures, then
* succeed.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 39)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
999;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 19)->offset =
123456789;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/*
* Commit transaction, first with som failures, then succeed.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 3,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors and that the producer can recover.
*/
static void do_test_txn_fatal_idempo_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Commit the transaction, should fail */
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side bumping of the
* producer PID take longer than the remaining transaction timeout
* which should raise a retriable error from abort_transaction().
*
* @param with_sleep After the first abort sleep longer than it takes to
* re-init the pid so that the internal state automatically
* transitions.
*/
static void do_test_txn_slow_reinit(rd_bool_t with_sleep) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
SUB_TEST("%s sleep", with_sleep ? "with" : "without");
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Set transaction coordinator latency higher than
* the abort_transaction() call timeout so that the automatic
* re-initpid takes longer than abort_transaction(). */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000 /*10s*/);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Commit the transaction, should fail */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction, should fail with retriable (timeout) error */
TIMING_START(&timing, "abort_transaction(100)");
error = rd_kafka_abort_transaction(rk, 100);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY("First abort_transaction() failed: %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"Expected retriable error");
rd_kafka_error_destroy(error);
if (with_sleep)
rd_sleep(12);
/* Retry abort, should now finish. */
TEST_SAY("Retrying abort\n");
TIMING_START(&timing, "abort_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&timing);
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side bumping of the
* producer PID fail with a fencing error.
* Should raise a fatal error.
*
* @param error_code Which error code InitProducerIdRequest should fail with.
* Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
* or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
*/
static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
char errstr[512];
rd_kafka_resp_err_t fatal_err;
SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__FENCED;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Fail the PID reinit */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Abort the transaction, should fail with a fatal error */
error = rd_kafka_abort_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY("abort_transaction() failed: %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
rd_kafka_error_destroy(error);
fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief Test EndTxn errors.
*/
static void do_test_txn_endtxn_errors(void) {
rd_kafka_t *rk = NULL;
rd_kafka_mock_cluster_t *mcluster = NULL;
rd_kafka_resp_err_t err;
struct {
size_t error_cnt;
rd_kafka_resp_err_t errors[4];
rd_kafka_resp_err_t exp_err;
rd_bool_t exp_retriable;
rd_bool_t exp_abortable;
rd_bool_t exp_fatal;
rd_bool_t exp_successful_abort;
} scenario[] = {
/* This list of errors is from the EndTxnResponse handler in
* AK clients/.../TransactionManager.java */
{
/* #0 */
2,
{RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #1 */
2,
{RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #2 */
1,
{RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #3 */
3,
{RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #4: the abort is auto-recovering thru epoch bump */
1,
{RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID},
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
rd_true /* successful abort */
},
{
/* #5: the abort is auto-recovering thru epoch bump */
1,
{RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING},
RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
rd_true /* successful abort */
},
{
/* #6 */
1,
{RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #7 */
1,
{RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #8 */
1,
{RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED},
RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #9 */
1,
{RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED},
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */
},
{
/* #10 */
/* Any other error should raise a fatal error */
1,
{RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE},
RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
},
{
/* #11 */
1,
{RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{0},
};
int i;
SUB_TEST_QUICK();
for (i = 0; scenario[i].error_cnt > 0; i++) {
int j;
/* For each scenario, test:
* commit_transaction()
* flush() + commit_transaction()
* abort_transaction()
* flush() + abort_transaction()
*/
for (j = 0; j < (2 + 2); j++) {
rd_bool_t commit = j < 2;
rd_bool_t with_flush = j & 1;
rd_bool_t exp_successful_abort =
!commit && scenario[i].exp_successful_abort;
const char *commit_str =
commit ? (with_flush ? "commit&flush" : "commit")
: (with_flush ? "abort&flush" : "abort");
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
test_timing_t t_call;
TEST_SAY("Testing scenario #%d %s with %" PRIusz
" injected erorrs, expecting %s\n",
i, commit_str, scenario[i].error_cnt,
exp_successful_abort
? "successful abort"
: rd_kafka_err2name(scenario[i].exp_err));
if (!rk) {
const char *txnid = "myTxnId";
rk = create_txn_producer(&mcluster, txnid, 3,
NULL);
TEST_CALL_ERROR__(
rd_kafka_init_transactions(rk, 5000));
}
/*
* Start transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Transaction aborts will cause DR errors:
* ignore them. */
test_curr->ignore_dr_err = !commit;
/*
* Produce a message.
*/
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s",
rd_kafka_err2str(err));
if (with_flush)
test_flush(rk, -1);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4",
3)
->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64",
60)
->offset = 99999;
cgmetadata =
rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/*
* Commit transaction, first with som failures,
* then succeed.
*/
rd_kafka_mock_push_request_errors_array(
mcluster, RD_KAFKAP_EndTxn, scenario[i].error_cnt,
scenario[i].errors);
TIMING_START(&t_call, "%s", commit_str);
if (commit)
error = rd_kafka_commit_transaction(
rk, tmout_multip(5000));
else
error = rd_kafka_abort_transaction(
rk, tmout_multip(5000));
TIMING_STOP(&t_call);
if (error)
TEST_SAY(
"Scenario #%d %s failed: %s: %s "
"(retriable=%s, req_abort=%s, "
"fatal=%s)\n",
i, commit_str, rd_kafka_error_name(error),
rd_kafka_error_string(error),
RD_STR_ToF(
rd_kafka_error_is_retriable(error)),
RD_STR_ToF(
rd_kafka_error_txn_requires_abort(
error)),
RD_STR_ToF(rd_kafka_error_is_fatal(error)));
else
TEST_SAY("Scenario #%d %s succeeded\n", i,
commit_str);
if (!scenario[i].exp_err || exp_successful_abort) {
TEST_ASSERT(!error,
"Expected #%d %s to succeed, "
"got %s",
i, commit_str,
rd_kafka_error_string(error));
continue;
}
TEST_ASSERT(error != NULL, "Expected #%d %s to fail", i,
commit_str);
TEST_ASSERT(scenario[i].exp_err ==
rd_kafka_error_code(error),
"Scenario #%d: expected %s, not %s", i,
rd_kafka_err2name(scenario[i].exp_err),
rd_kafka_error_name(error));
TEST_ASSERT(
scenario[i].exp_retriable ==
(rd_bool_t)rd_kafka_error_is_retriable(error),
"Scenario #%d: retriable mismatch", i);
TEST_ASSERT(
scenario[i].exp_abortable ==
(rd_bool_t)rd_kafka_error_txn_requires_abort(
error),
"Scenario #%d: abortable mismatch", i);
TEST_ASSERT(
scenario[i].exp_fatal ==
(rd_bool_t)rd_kafka_error_is_fatal(error),
"Scenario #%d: fatal mismatch", i);
/* Handle errors according to the error flags */
if (rd_kafka_error_is_fatal(error)) {
TEST_SAY("Fatal error, destroying producer\n");
rd_kafka_error_destroy(error);
rd_kafka_destroy(rk);
rk = NULL; /* Will be re-created on the next
* loop iteration. */
} else if (rd_kafka_error_txn_requires_abort(error)) {
rd_kafka_error_destroy(error);
TEST_SAY(
"Abortable error, "
"aborting transaction\n");
TEST_CALL_ERROR__(
rd_kafka_abort_transaction(rk, -1));
} else if (rd_kafka_error_is_retriable(error)) {
rd_kafka_error_destroy(error);
TEST_SAY("Retriable error, retrying %s once\n",
commit_str);
if (commit)
TEST_CALL_ERROR__(
rd_kafka_commit_transaction(rk,
5000));
else
TEST_CALL_ERROR__(
rd_kafka_abort_transaction(rk,
5000));
} else {
TEST_FAIL(
"Scenario #%d %s: "
"Permanent error without enough "
"hints to proceed: %s\n",
i, commit_str,
rd_kafka_error_string(error));
}
}
}
/* All done */
if (rk)
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that the commit/abort works properly with infinite timeout.
*/
static void do_test_txn_endtxn_infinite(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int i;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail on as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/*
* Commit/abort transaction, first with som retriable failures,
* then success.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 10,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, -1);
else
error = rd_kafka_abort_transaction(rk, -1);
TIMING_STOP(&t_call);
TEST_SAY("%s returned %s\n", commit_str,
error ? rd_kafka_error_string(error) : "success");
TEST_ASSERT(!error, "Expected %s to succeed, got %s",
commit_str, rd_kafka_error_string(error));
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that the commit/abort user timeout is honoured.
*/
static void do_test_txn_endtxn_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int i;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/*
* Commit/abort transaction, first with some retriable failures
* whos retries exceed the user timeout.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 10,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, 100);
else
error = rd_kafka_abort_transaction(rk, 100);
TIMING_STOP(&t_call);
TEST_SAY_ERROR(error, "%s returned: ", commit_str);
TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
TEST_ASSERT(
rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected %s to fail with timeout, not %s: %s", commit_str,
rd_kafka_error_name(error), rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"%s failure should raise a retriable error",
commit_str);
rd_kafka_error_destroy(error);
/* Now call it again with an infinite timeout, should work. */
TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
if (commit)
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
else
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&t_call);
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test commit/abort inflight timeout behaviour, which should result
* in a retriable error.
*/
static void do_test_txn_endtxn_timeout_inflight(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int32_t coord_id = 1;
int i;
SUB_TEST();
allowed_error = RD_KAFKA_RESP_ERR__TIMED_OUT;
test_curr->is_fatal_cb = error_is_fatal_cb;
rk = create_txn_producer(&mcluster, txnid, 1, "transaction.timeout.ms",
"5000", NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/* Let EndTxn & EndTxn retry timeout */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 2,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, 4000);
else
error = rd_kafka_abort_transaction(rk, 4000);
TIMING_STOP(&t_call);
TEST_SAY_ERROR(error, "%s returned: ", commit_str);
TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
TEST_ASSERT(
rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected %s to fail with timeout, not %s: %s", commit_str,
rd_kafka_error_name(error), rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"%s failure should raise a retriable error",
commit_str);
rd_kafka_error_destroy(error);
/* Now call it again with an infinite timeout, should work. */
TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
if (commit)
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
else
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&t_call);
}
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test that EndTxn is properly sent for aborted transactions
* even if AddOffsetsToTxnRequest was retried.
* This is a check for a txn_req_cnt bug.
*/
static void do_test_txn_req_cnt(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
/* Messages will fail on abort(), ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Send some arbitrary offsets, first with some failures, then
* succeed.
*/
offsets = rd_kafka_topic_partition_list_new(2);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 40)->offset =
999999111;
rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_AddOffsetsToTxn,
2,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test abortable errors using mock broker error injections
* and code coverage checks.
*/
static void do_test_txn_requires_abort_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
int r;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* 1. Fail on produce
*/
TEST_SAY("1. Fail on produce\n");
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to fail */
test_flush(rk, 5000);
/* Any other transactional API should now raise an error */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_ASSERT(error, "expected error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"expected abortable error, not %s",
rd_kafka_error_string(error));
TEST_SAY("Error %s: %s\n", rd_kafka_error_name(error),
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/*
* 2. Restart transaction and fail on AddPartitionsToTxn
*/
TEST_SAY("2. Fail on AddPartitionsToTxn\n");
/* First refresh proper Metadata to clear the topic's auth error,
* otherwise the produce() below will fail immediately. */
r = test_get_partition_count(rk, "mytopic", 5000);
TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
error = rd_kafka_commit_transaction(rk, 5000);
TEST_ASSERT(error, "commit_transaction should have failed");
TEST_SAY("commit_transaction() error %s: %s\n",
rd_kafka_error_name(error), rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/*
* 3. Restart transaction and fail on AddOffsetsToTxn
*/
TEST_SAY("3. Fail on AddOffsetsToTxn\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn, 1,
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(error, "Expected send_offsets..() to fail");
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
"expected send_offsets_to_transaction() to fail with "
"group auth error: not %s",
rd_kafka_error_name(error));
rd_kafka_error_destroy(error);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
error = rd_kafka_commit_transaction(rk, 5000);
TEST_ASSERT(error, "commit_transaction should have failed");
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test error handling and recover for when broker goes down during
* an ongoing transaction.
*/
static void do_test_txn_broker_down_in_txn(rd_bool_t down_coord) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id, leader_id, down_id;
const char *down_what;
rd_kafka_resp_err_t err;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1000;
int remains = 0;
/* Assign coordinator and leader to two different brokers */
coord_id = 1;
leader_id = 2;
if (down_coord) {
down_id = coord_id;
down_what = "coordinator";
} else {
down_id = leader_id;
down_what = "leader";
}
SUB_TEST_QUICK("Test %s down", down_what);
rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt / 2, NULL, 0, &remains);
TEST_SAY("Bringing down %s %" PRId32 "\n", down_what, down_id);
rd_kafka_mock_broker_set_down(mcluster, down_id);
rd_kafka_flush(rk, 3000);
/* Produce remaining messages */
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
rd_sleep(2);
TEST_SAY("Bringing up %s %" PRId32 "\n", down_what, down_id);
rd_kafka_mock_broker_set_up(mcluster, down_id);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
rd_kafka_destroy(rk);
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Advance the coord_id to the next broker.
*/
static void set_next_coord(rd_kafka_mock_cluster_t *mcluster,
const char *transactional_id,
int broker_cnt,
int32_t *coord_idp) {
int32_t new_coord_id;
new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
TEST_SAY("Changing transaction coordinator from %" PRId32 " to %" PRId32
"\n",
*coord_idp, new_coord_id);
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
new_coord_id);
*coord_idp = new_coord_id;
}
/**
* @brief Switch coordinator during a transaction.
*
*/
static void do_test_txn_switch_coordinator(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id;
const char *topic = "test";
const char *transactional_id = "txnid";
const int broker_cnt = 5;
const int iterations = 20;
int i;
test_timeout_set(iterations * 10);
SUB_TEST("Test switching coordinators");
rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
coord_id = 1;
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < iterations; i++) {
const int msgcnt = 100;
int remains = 0;
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt / 2, NULL, 0);
if (!(i % 3))
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
/* Produce remaining messages */
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
msgcnt / 2, msgcnt / 2, NULL, 0,
&remains);
if ((i & 1) || !(i % 8))
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
if (!(i % 5)) {
test_curr->ignore_dr_err = rd_false;
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
} else {
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
}
}
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Switch coordinator during a transaction when AddOffsetsToTxn
* are sent. #3571.
*/
static void do_test_txn_switch_coordinator_refresh(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = "test";
const char *transactional_id = "txnid";
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST("Test switching coordinators (refresh)");
rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Switch the coordinator so that AddOffsetsToTxnRequest
* will respond with NOT_COORDINATOR. */
TEST_SAY("Switching to coordinator 2\n");
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 29)->offset =
99999;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, 20 * 1000));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Produce some messages */
test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);
/* And commit the transaction */
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test fatal error handling when transactions are not supported
* by the broker.
*/
static void do_test_txns_not_supported(void) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
SUB_TEST_QUICK();
test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "transactional.id", "myxnid");
test_conf_set(conf, "bootstrap.servers", ",");
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
/* Create mock cluster */
mcluster = rd_kafka_mock_cluster_new(rk, 3);
/* Disable InitProducerId */
rd_kafka_mock_set_apiversion(mcluster, 22 /*InitProducerId*/, -1, -1);
rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
error = rd_kafka_init_transactions(rk, 5 * 1000);
TEST_SAY("init_transactions() returned %s: %s\n",
error ? rd_kafka_error_name(error) : "success",
error ? rd_kafka_error_string(error) : "success");
TEST_ASSERT(error, "Expected init_transactions() to fail");
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
"Expected init_transactions() to fail with %s, not %s: %s",
rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
rd_kafka_error_name(error), rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test"),
RD_KAFKA_V_KEY("test", 4), RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
"Expected producev() to fail with %s, not %s",
rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
rd_kafka_err2name(err));
rd_kafka_mock_cluster_destroy(mcluster);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
*/
static void do_test_txns_send_offsets_concurrent_is_retried(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/*
* Have AddOffsetsToTxn fail but eventually succeed due to
* infinite retries.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that send_offsets_to_transaction() with no eligible offsets
* is handled properly - the call should succeed immediately and be
* repeatable.
*/
static void do_test_txns_send_offsets_non_eligible(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/* Empty offsets list */
offsets = rd_kafka_topic_partition_list_new(0);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
/* Now call it again, should also succeed. */
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that request timeouts don't cause crash (#2913).
*/
static void do_test_txns_no_timeout_crash(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk =
create_txn_producer(&mcluster, "txnid", 3, "socket.timeout.ms",
"1000", "transaction.timeout.ms", "5000", NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
test_flush(rk, -1);
/* Delay all broker connections */
if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
TEST_FAIL("Failed to set broker RTT: %s",
rd_kafka_err2str(err));
/* send_offsets..() should now time out */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(error, "Expected send_offsets..() to fail");
TEST_SAY("send_offsets..() failed with %serror: %s\n",
rd_kafka_error_is_retriable(error) ? "retriable " : "",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"expected send_offsets_to_transaction() to fail with "
"timeout, not %s",
rd_kafka_error_name(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"expected send_offsets_to_transaction() to fail with "
"a retriable error");
rd_kafka_error_destroy(error);
/* Reset delay and try again */
if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
TEST_FAIL("Failed to reset broker RTT: %s",
rd_kafka_err2str(err));
TEST_SAY("Retrying send_offsets..()\n");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
rd_kafka_error_string(error));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test auth failure handling.
*/
static void do_test_txn_auth_failure(int16_t ApiKey,
rd_kafka_resp_err_t ErrorCode) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s", rd_kafka_ApiKey2str(ApiKey),
rd_kafka_err2name(ErrorCode));
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
rd_kafka_mock_push_request_errors(mcluster, ApiKey, 1, ErrorCode);
error = rd_kafka_init_transactions(rk, 5000);
TEST_ASSERT(error, "Expected init_transactions() to fail");
TEST_SAY("init_transactions() failed: %s: %s\n",
rd_kafka_err2name(rd_kafka_error_code(error)),
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
"Expected error %s, not %s", rd_kafka_err2name(ErrorCode),
rd_kafka_err2name(rd_kafka_error_code(error)));
TEST_ASSERT(rd_kafka_error_is_fatal(error),
"Expected error to be fatal");
TEST_ASSERT(!rd_kafka_error_is_retriable(error),
"Expected error to not be retriable");
rd_kafka_error_destroy(error);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Issue #3041: Commit fails due to message flush() taking too long,
* eventually resulting in an unabortable error and failure to
* re-init the transactional producer.
*/
static void do_test_txn_flush_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
const char *topic = "myTopic";
const int32_t coord_id = 2;
int msgcounter = 0;
rd_bool_t is_retry = rd_false;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "message.timeout.ms",
"10000", "transaction.timeout.ms", "10000",
/* Speed up coordinator reconnect */
"reconnect.backoff.max.ms", "1000", NULL);
/* Broker down is not a test-failing error */
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
/* Set coordinator so we can disconnect it later */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
/*
* Init transactions
*/
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
retry:
if (!is_retry) {
/* First attempt should fail. */
test_curr->ignore_dr_err = rd_true;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
/* Assign invalid partition leaders for some partitions so
* that messages will not be delivered. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
} else {
/* The retry should succeed */
test_curr->ignore_dr_err = rd_false;
test_curr->exp_dr_err = is_retry
? RD_KAFKA_RESP_ERR_NO_ERROR
: RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
}
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Produce some messages to specific partitions and random.
*/
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA, 0, 0, 100,
NULL, 10, &msgcounter);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 49)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
999;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 34)->offset =
123456789;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
rd_sleep(2);
if (!is_retry) {
/* Now disconnect the coordinator. */
TEST_SAY("Disconnecting transaction coordinator %" PRId32 "\n",
coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
}
/*
* Start committing.
*/
error = rd_kafka_commit_transaction(rk, -1);
if (!is_retry) {
TEST_ASSERT(error != NULL, "Expected commit to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else {
TEST_ASSERT(!error, "Expected commit to succeed, not: %s",
rd_kafka_error_string(error));
}
if (!is_retry) {
/*
* Bring the coordinator back up.
*/
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_sleep(2);
/*
* Abort, and try again, this time without error.
*/
TEST_SAY("Aborting and retrying\n");
is_retry = rd_true;
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
goto retry;
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief ESC-4424: rko is reused in response handler after destroy in coord_req
* sender due to bad state.
*
* This is somewhat of a race condition so we need to perform a couple of
* iterations before it hits, usually 2 or 3, so we try at least 15 times.
*/
static void do_test_txn_coord_req_destroy(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int i;
int errcnt = 0;
SUB_TEST();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 15; i++) {
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
test_timeout_set(10);
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Inject errors to trigger retries
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn,
2, /* first request + number of internal retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn,
1, /* first request + number of internal retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 4,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
/* FIXME: When KIP-360 is supported, add this error:
* RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/*
* Send offsets to transaction
*/
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)
->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error = rd_kafka_send_offsets_to_transaction(rk, offsets,
cgmetadata, -1);
TEST_SAY("send_offsets_to_transaction() #%d: %s\n", i,
rd_kafka_error_string(error));
/* As we can't control the exact timing and sequence
* of requests this sometimes fails and sometimes succeeds,
* but we run the test enough times to trigger at least
* one failure. */
if (error) {
TEST_SAY(
"send_offsets_to_transaction() #%d "
"failed (expectedly): %s\n",
i, rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error for #%d", i);
rd_kafka_error_destroy(error);
errcnt++;
}
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Allow time for internal retries */
rd_sleep(2);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
}
TEST_ASSERT(errcnt > 0,
"Expected at least one send_offets_to_transaction() "
"failure");
/* All done */
rd_kafka_destroy(rk);
}
static rd_atomic32_t multi_find_req_cnt;
static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
return RD_KAFKA_RESP_ERR_NO_ERROR;
TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
rtt != -1 ? (float)rtt / 1000.0 : 0.0,
done ? "already done" : "not done yet",
rd_kafka_err2name(err));
if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
/* Trigger a broker down/up event, which in turns
* triggers the coord_req_fsm(). */
rd_kafka_mock_broker_set_down(mcluster, 2);
rd_kafka_mock_broker_set_up(mcluster, 2);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/* Trigger a broker down/up event, which in turns
* triggers the coord_req_fsm(). */
rd_kafka_mock_broker_set_down(mcluster, 3);
rd_kafka_mock_broker_set_up(mcluster, 3);
/* Clear the downed broker's latency so that it reconnects
* quickly, otherwise the ApiVersionRequest will be delayed and
* this will in turn delay the -> UP transition that we need to
* trigger the coord_reqs. */
rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
/* Only do this down/up once */
rd_atomic32_add(&multi_find_req_cnt, 10000);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
* the same coord_req_t, but the first one received will destroy
* the coord_req_t object and make the subsequent FindCoordingResponses
* reference a freed object.
*
* What we want to achieve is this sequence:
* 1. AddOffsetsToTxnRequest + Response which..
* 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
* 3. Triggers a FindCoordinatorRequest
* 4. FindCoordinatorResponse from 3 is received ..
* 5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
* 6. Another broker changing state to Up triggers coord reqs again, which..
* 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
* 7. FindCoordinatorResponse from 5 is received, references the destroyed rko
* and crashes.
*/
static void do_test_txn_coord_req_multi_find(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
int i;
SUB_TEST();
rd_atomic32_init(&multi_find_req_cnt, 0);
on_response_received_cb = multi_find_on_response_received_cb;
rk = create_txn_producer(&mcluster, txnid, 3,
/* Need connections to all brokers so we
* can trigger coord_req_fsm events
* by toggling connections. */
"enable.sparse.connections", "false",
/* Set up on_response_received interceptor */
"on_response_received", "", NULL);
/* Let broker 1 be both txn and group coordinator
* so that the group coordinator connection is up when it is time
* send the TxnOffsetCommitRequest. */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
/* Set broker 1, 2, and 3 as leaders for a partition each and
* later produce to both partitions so we know there's a connection
* to all brokers. */
rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
for (i = 0; i < 3; i++) {
err = rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(i),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
}
test_flush(rk, 5000);
/*
* send_offsets_to_transaction() will query for the group coordinator,
* we need to make those requests slow so that multiple requests are
* sent.
*/
for (i = 1; i <= 3; i++)
rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
/*
* Send offsets to transaction
*/
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_SAY("send_offsets_to_transaction() %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
rd_kafka_error_string(error));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Clear delay */
for (i = 1; i <= 3; i++)
rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
rd_sleep(5);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
"on_request_sent interceptor did not trigger properly");
rd_kafka_destroy(rk);
on_response_received_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief ESC-4410: adding producer partitions gradually will trigger multiple
* AddPartitionsToTxn requests. Due to a bug the third partition to be
* registered would hang in PEND_TXN state.
*
* Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
* at the same time, followed by a need for a third:
*
* 1. Set coordinator broker rtt high (to give us time to produce).
* 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
* 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
* 4. Wait for second AddPartitionsToTxn response.
* 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
* causes it to be stale in pending state.
*/
static rd_atomic32_t multi_addparts_resp_cnt;
static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
", ApiKey %hd, CorrId %d, rtt %.2fms, count %" PRId32
": %s\n",
rd_kafka_name(rk), brokername, brokerid, ApiKey,
CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0,
rd_atomic32_get(&multi_addparts_resp_cnt),
rd_kafka_err2name(err));
rd_atomic32_add(&multi_addparts_resp_cnt, 1);
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static void do_test_txn_addparts_req_multi(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
const char *txnid = "txnid", *topic = "mytopic";
int32_t txn_coord = 2;
SUB_TEST();
rd_atomic32_init(&multi_addparts_resp_cnt, 0);
on_response_received_cb = multi_addparts_response_received_cb;
rk = create_txn_producer(&mcluster, txnid, 3, "linger.ms", "0",
"message.timeout.ms", "9000",
/* Set up on_response_received interceptor */
"on_response_received", "", NULL);
/* Let broker 1 be txn coordinator. */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
/* Set partition leaders to non-txn-coord broker so they wont
* be affected by rtt delay */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
/*
* Run one transaction first to let the client familiarize with
* the topic, this avoids metadata lookups, etc, when the real
* test is run.
*/
TEST_SAY("Running seed transaction\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE("seed", 4),
RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/*
* Now perform test transaction with rtt delays
*/
TEST_SAY("Running test transaction\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Reset counter */
rd_atomic32_set(&multi_addparts_resp_cnt, 0);
/* Add latency to txn coordinator so we can pace our produce() calls */
rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
/* Produce to partition 0 */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
rd_usleep(500 * 1000, NULL);
/* Produce to partition 1 */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(1),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
rd_usleep(10 * 1000, NULL);
TEST_SAY("%" PRId32 " AddPartitionsToTxnResponses seen\n",
rd_atomic32_get(&multi_addparts_resp_cnt));
/* Produce to partition 2, this message will hang in
* queue if the bug is not fixed. */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(2),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Allow some extra time for things to settle before committing
* transaction. */
rd_usleep(1000 * 1000, NULL);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10 * 1000));
/* All done */
rd_kafka_destroy(rk);
on_response_received_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
*
* There are two things to test;
* - OffsetFetch triggered by committed() (and similar code paths)
* - OffsetFetch triggered by assign()
*/
static void do_test_unstable_offset_commit(void) {
rd_kafka_t *rk, *c;
rd_kafka_conf_t *c_conf;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
const char *topic = "srctopic4";
const int msgcnt = 100;
const int64_t offset_to_commit = msgcnt / 2;
int i;
int remains = 0;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_conf_init(&c_conf, NULL, 0);
test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
test_conf_set(c_conf, "bootstrap.servers",
rd_kafka_mock_cluster_bootstraps(mcluster));
test_conf_set(c_conf, "enable.partition.eof", "true");
test_conf_set(c_conf, "auto.offset.reset", "error");
c = test_create_consumer("mygroup", NULL, c_conf, NULL);
rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
/* Produce some messages to the topic so that the consumer has
* something to read. */
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt, NULL, 0,
&remains);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* Commit offset */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
offset_to_commit;
TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0 /*sync*/));
rd_kafka_topic_partition_list_destroy(offsets);
/* Retrieve offsets by calling committed().
*
* Have OffsetFetch fail and retry, on the first iteration
* the API timeout is higher than the amount of time the retries will
* take and thus succeed, and on the second iteration the timeout
* will be lower and thus fail. */
for (i = 0; i < 2; i++) {
rd_kafka_resp_err_t err;
rd_kafka_resp_err_t exp_err =
i == 0 ? RD_KAFKA_RESP_ERR_NO_ERROR
: RD_KAFKA_RESP_ERR__TIMED_OUT;
int timeout_ms = exp_err ? 200 : 5 * 1000;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_OffsetFetch,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0);
err = rd_kafka_committed(c, offsets, timeout_ms);
TEST_SAY("#%d: committed() returned %s (expected %s)\n", i,
rd_kafka_err2name(err), rd_kafka_err2name(exp_err));
TEST_ASSERT(err == exp_err,
"#%d: Expected committed() to return %s, not %s", i,
rd_kafka_err2name(exp_err), rd_kafka_err2name(err));
TEST_ASSERT(offsets->cnt == 1,
"Expected 1 committed offset, not %d",
offsets->cnt);
if (!exp_err)
TEST_ASSERT(offsets->elems[0].offset ==
offset_to_commit,
"Expected committed offset %" PRId64
", "
"not %" PRId64,
offset_to_commit, offsets->elems[0].offset);
else
TEST_ASSERT(offsets->elems[0].offset < 0,
"Expected no committed offset, "
"not %" PRId64,
offsets->elems[0].offset);
rd_kafka_topic_partition_list_destroy(offsets);
}
TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
RD_KAFKA_OFFSET_STORED;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_OffsetFetch,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
test_consumer_incremental_assign("assign", c, offsets);
rd_kafka_topic_partition_list_destroy(offsets);
test_consumer_poll_exact("consume", c, 0, 1 /*eof*/, 0, msgcnt / 2,
rd_true /*exact counts*/, NULL);
/* All done */
rd_kafka_destroy(c);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief If a message times out locally before being attempted to send
* and commit_transaction() is called, the transaction must not succeed.
* https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
*/
static void do_test_commit_after_msg_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id, leader_id;
rd_kafka_resp_err_t err;
rd_kafka_error_t *error;
const char *topic = "test";
const char *transactional_id = "txnid";
int remains = 0;
SUB_TEST_QUICK();
/* Assign coordinator and leader to two different brokers */
coord_id = 1;
leader_id = 2;
rk = create_txn_producer(&mcluster, transactional_id, 3,
"message.timeout.ms", "5000",
"transaction.timeout.ms", "10000", NULL);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_SAY("Bringing down %" PRId32 "\n", leader_id);
rd_kafka_mock_broker_set_down(mcluster, leader_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
TEST_SAY_ERROR(error, "commit_transaction() failed (as expected): ");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected txn_requires_abort error");
rd_kafka_error_destroy(error);
/* Bring the brokers up so the abort can complete */
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_kafka_mock_broker_set_up(mcluster, leader_id);
TEST_SAY("Aborting transaction\n");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not flushed\n", remains);
TEST_SAY("Attempting second transaction, which should succeed\n");
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
* during an ongoing transaction.
* The transaction should instead enter the abortable state.
*/
static void do_test_out_of_order_seq(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 1, leader = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
rd_kafka_resp_err_t err;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce one seeding message first to get the leader up and running */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Let partition leader have a latency of 2 seconds
* so that we can have multiple messages in-flight. */
rd_kafka_mock_broker_set_rtt(mcluster, leader, 2 * 1000);
/* Produce a message, let it fail with with different errors,
* ending with OUT_OF_ORDER which previously triggered an
* Epoch bump. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 3,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
/* Produce three messages that will be delayed
* and have errors injected.*/
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Now sleep a short while so that the messages are processed
* by the broker and errors are returned. */
TEST_SAY("Sleeping..\n");
rd_sleep(5);
rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
/* Produce a fifth message, should fail with ERR__STATE since
* the transaction should have entered the abortable state. */
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
"Expected produce() to fail with ERR__STATE, not %s",
rd_kafka_err2name(err));
TEST_SAY("produce() failed as expected: %s\n", rd_kafka_err2str(err));
/* Commit the transaction, should fail with abortable error. */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify lossless delivery if topic disappears from Metadata for awhile.
*
* If a topic is removed from metadata inbetween transactions, the producer
* will remove its partition state for the topic's partitions.
* If later the same topic comes back (same topic instance, not a new creation)
* then the producer must restore the previously used msgid/BaseSequence
* in case the same Epoch is still used, or messages will be silently lost
* as they would seem like legit duplicates to the broker.
*
* Reproduction:
* 1. produce msgs to topic, commit transaction.
* 2. remove topic from metadata
* 3. make sure client updates its metadata, which removes the partition
* objects.
* 4. restore the topic in metadata
* 5. produce new msgs to topic, commit transaction.
* 6. consume topic. All messages should be accounted for.
*/
static void do_test_topic_disappears_for_awhile(void) {
rd_kafka_t *rk, *c;
rd_kafka_conf_t *c_conf;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = "mytopic";
const char *txnid = "myTxnId";
test_timing_t timing;
int i;
int msgcnt = 0;
const int partition_cnt = 10;
SUB_TEST_QUICK();
rk = create_txn_producer(
&mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100",
"topic.metadata.refresh.interval.ms", "2000", NULL);
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
for (i = 0; i < 2; i++) {
int cnt = 3 * 2 * partition_cnt;
rd_bool_t remove_topic = (i % 2) == 0;
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
while (cnt-- >= 0) {
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_PARTITION(cnt % partition_cnt),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
msgcnt++;
}
/* Commit the transaction */
TIMING_START(&timing, "commit_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TIMING_STOP(&timing);
if (remove_topic) {
/* Make it seem the topic is removed, refresh metadata,
* and then make the topic available again. */
const rd_kafka_metadata_t *md;
TEST_SAY("Marking topic as non-existent\n");
rd_kafka_mock_topic_set_error(
mcluster, topic,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, NULL, &md,
tmout_multip(5000)));
rd_kafka_metadata_destroy(md);
rd_sleep(2);
TEST_SAY("Bringing topic back to life\n");
rd_kafka_mock_topic_set_error(
mcluster, topic, RD_KAFKA_RESP_ERR_NO_ERROR);
}
}
TEST_SAY("Verifying messages by consumtion\n");
test_conf_init(&c_conf, NULL, 0);
test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
test_conf_set(c_conf, "bootstrap.servers",
rd_kafka_mock_cluster_bootstraps(mcluster));
test_conf_set(c_conf, "enable.partition.eof", "true");
test_conf_set(c_conf, "auto.offset.reset", "earliest");
c = test_create_consumer("mygroup", NULL, c_conf, NULL);
test_consumer_subscribe(c, topic);
test_consumer_poll_exact("consume", c, 0, partition_cnt, 0, msgcnt,
rd_true /*exact*/, NULL);
rd_kafka_destroy(c);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that group coordinator requests can handle an
* untimely disconnect.
*
* The transaction manager makes use of librdkafka coord_req to commit
* transaction offsets to the group coordinator.
* If the connection to the given group coordinator is not up the
* coord_req code will request a connection once, but if this connection fails
* there will be no new attempts and the coord_req will idle until either
* destroyed or the connection is retried for other reasons.
* This in turn stalls the send_offsets_to_transaction() call until the
* transaction times out.
*
* There are two variants to this test based on switch_coord:
* - True - Switches the coordinator during the downtime.
* The client should detect this and send the request to the
* new coordinator.
* - False - The coordinator remains on the down broker. Client will reconnect
* when down broker comes up again.
*/
struct some_state {
rd_kafka_mock_cluster_t *mcluster;
rd_bool_t switch_coord;
int32_t broker_id;
const char *grpid;
};
static int delayed_up_cb(void *arg) {
struct some_state *state = arg;
rd_sleep(3);
if (state->switch_coord) {
TEST_SAY("Switching group coordinator to %" PRId32 "\n",
state->broker_id);
rd_kafka_mock_coordinator_set(state->mcluster, "group",
state->grpid, state->broker_id);
} else {
TEST_SAY("Bringing up group coordinator %" PRId32 "..\n",
state->broker_id);
rd_kafka_mock_broker_set_up(state->mcluster, state->broker_id);
}
return 0;
}
static void do_test_disconnected_group_coord(rd_bool_t switch_coord) {
const char *topic = "mytopic";
const char *txnid = "myTxnId";
const char *grpid = "myGrpId";
const int partition_cnt = 1;
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
struct some_state state = RD_ZERO_INIT;
test_timing_t timing;
thrd_t thrd;
int ret;
SUB_TEST_QUICK("switch_coord=%s", RD_STR_ToF(switch_coord));
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
/* Broker 1: txn coordinator
* Broker 2: group coordinator
* Broker 3: partition leader & backup coord if switch_coord=true */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3);
/* Bring down group coordinator so there are no undesired
* connections to it. */
rd_kafka_mock_broker_set_down(mcluster, 2);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
rd_sleep(1);
/* Run a background thread that after 3s, which should be enough
* to perform the first failed connection attempt, makes the
* group coordinator available again. */
state.switch_coord = switch_coord;
state.mcluster = mcluster;
state.grpid = grpid;
state.broker_id = switch_coord ? 3 : 2;
if (thrd_create(&thrd, delayed_up_cb, &state) != thrd_success)
TEST_FAIL("Failed to create thread");
TEST_SAY("Calling send_offsets_to_transaction()\n");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 1;
cgmetadata = rd_kafka_consumer_group_metadata_new(grpid);
TIMING_START(&timing, "send_offsets_to_transaction(-1)");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
TIMING_STOP(&timing);
TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
thrd_join(thrd, &ret);
/* Commit the transaction */
TIMING_START(&timing, "commit_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TIMING_STOP(&timing);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test that a NULL coordinator is not fatal when
* the transactional producer reconnects to the txn coordinator
* and the first thing it does is a FindCoordinatorRequest that
* fails with COORDINATOR_NOT_AVAILABLE, setting coordinator to NULL.
*/
static void do_test_txn_coordinator_null_not_fatal(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
SUB_TEST_QUICK();
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
/* One second is the minimum transaction timeout */
rk = create_txn_producer(&mcluster, transactional_id, 1,
"transaction.timeout.ms", "1000", NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Makes the produce request timeout. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
/* This value is linked to transaction.timeout.ms, needs enough time
* so the message times out and a DrainBump sequence is started. */
rd_kafka_flush(rk, 1000);
/* To trigger the error the COORDINATOR_NOT_AVAILABLE response
* must come AFTER idempotent state has changed to WaitTransport
* but BEFORE it changes to WaitPID. To make it more likely
* rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms
* in rd_kafka_txn_coord_query, when unable to query for
* transaction coordinator.
*/
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);
/* Coordinator down starts the FindCoordinatorRequest loop. */
TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
/* Coordinator down for some time. */
rd_usleep(100 * 1000, NULL);
/* When it comes up, the error is triggered, if the preconditions
* happen. */
TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_up(mcluster, coord_id);
/* Make sure DRs are received */
rd_kafka_flush(rk, 1000);
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
/* Needs to wait some time before closing to make sure it doesn't go
* into TERMINATING state before error is triggered. */
rd_usleep(1000 * 1000, NULL);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Simple test to make sure the init_transactions() timeout is honoured
* and also not infinite.
*/
static void do_test_txn_resumable_init(void) {
rd_kafka_t *rk;
const char *transactional_id = "txnid";
rd_kafka_error_t *error;
test_timing_t duration;
SUB_TEST();
rd_kafka_conf_t *conf;
test_conf_init(&conf, NULL, 20);
test_conf_set(conf, "bootstrap.servers", "");
test_conf_set(conf, "transactional.id", transactional_id);
test_conf_set(conf, "transaction.timeout.ms", "4000");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
/* First make sure a lower timeout is honoured. */
TIMING_START(&duration, "init_transactions(1000)");
error = rd_kafka_init_transactions(rk, 1000);
TIMING_STOP(&duration);
if (error)
TEST_SAY("First init_transactions failed (as expected): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected _TIMED_OUT, not %s",
error ? rd_kafka_error_string(error) : "success");
rd_kafka_error_destroy(error);
TIMING_ASSERT(&duration, 900, 1500);
TEST_SAY(
"Performing second init_transactions() call now with an "
"infinite timeout: "
"should time out in 2 x transaction.timeout.ms\n");
TIMING_START(&duration, "init_transactions(infinite)");
error = rd_kafka_init_transactions(rk, -1);
TIMING_STOP(&duration);
if (error)
TEST_SAY("Second init_transactions failed (as expected): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected _TIMED_OUT, not %s",
error ? rd_kafka_error_string(error) : "success");
rd_kafka_error_destroy(error);
TIMING_ASSERT(&duration, 2 * 4000 - 500, 2 * 4000 + 500);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Retries a transaction call until it succeeds or returns a
* non-retriable error - which will cause the test to fail.
*
* @param intermed_calls Is a block of code that will be called after each
* retriable failure of \p call.
*/
#define RETRY_TXN_CALL__(call, intermed_calls) \
do { \
rd_kafka_error_t *_error = call; \
if (!_error) \
break; \
TEST_SAY_ERROR(_error, "%s: ", "" #call); \
TEST_ASSERT(rd_kafka_error_is_retriable(_error), \
"Expected retriable error"); \
TEST_SAY("%s failed, retrying in 1 second\n", "" #call); \
rd_kafka_error_destroy(_error); \
intermed_calls; \
rd_sleep(1); \
} while (1)
/**
* @brief Call \p call and expect it to fail with \p exp_err_code.
*/
#define TXN_CALL_EXPECT_ERROR__(call, exp_err_code) \
do { \
rd_kafka_error_t *_error = call; \
TEST_ASSERT(_error != NULL, \
"%s: Expected %s error, got success", "" #call, \
rd_kafka_err2name(exp_err_code)); \
TEST_SAY_ERROR(_error, "%s: ", "" #call); \
TEST_ASSERT(rd_kafka_error_code(_error) == exp_err_code, \
"%s: Expected %s error, got %s", "" #call, \
rd_kafka_err2name(exp_err_code), \
rd_kafka_error_name(_error)); \
rd_kafka_error_destroy(_error); \
} while (0)
/**
* @brief Simple test to make sure short API timeouts can be safely resumed
* by calling the same API again.
*
* @param do_commit Commit transaction if true, else abort transaction.
*/
static void do_test_txn_resumable_calls_timeout(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
SUB_TEST("%s_transaction", do_commit ? "commit" : "abort");
rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
TEST_SAY("Starting transaction\n");
TEST_SAY("Delaying first two InitProducerIdRequests by 500ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_InitProducerId, 2,
RD_KAFKA_RESP_ERR_NO_ERROR, 500, RD_KAFKA_RESP_ERR_NO_ERROR, 500);
RETRY_TXN_CALL__(
rd_kafka_init_transactions(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
RETRY_TXN_CALL__(rd_kafka_begin_transaction(rk), /*none*/);
TEST_SAY("Delaying ProduceRequests by 3000ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
TEST_SAY("Delaying SendOffsetsToTransaction by 400ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_AddOffsetsToTxn, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 400);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
/* This is not a resumable call on timeout */
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_SAY("Delaying EndTxnRequests by 1200ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR,
1200);
/* Committing/aborting the transaction will also be delayed by the
* previous accumulated remaining delays. */
if (do_commit) {
TEST_SAY("Committing transaction\n");
RETRY_TXN_CALL__(
rd_kafka_commit_transaction(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
} else {
TEST_SAY("Aborting transaction\n");
RETRY_TXN_CALL__(
rd_kafka_abort_transaction(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
}
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that resuming timed out calls that after the timeout, but
* before the resuming call, would error out.
*/
static void do_test_txn_resumable_calls_timeout_error(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
rd_kafka_error_t *error;
SUB_TEST_QUICK("%s_transaction", do_commit ? "commit" : "abort");
rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
TEST_SAY("Fail EndTxn fatally after 2000ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 1,
RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, 2000);
if (do_commit) {
TEST_SAY("Committing transaction\n");
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
RD_KAFKA_RESP_ERR__TIMED_OUT);
/* Sleep so that the background EndTxn fails locally and sets
* an error result. */
rd_sleep(3);
error = rd_kafka_commit_transaction(rk, -1);
} else {
TEST_SAY("Aborting transaction\n");
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
RD_KAFKA_RESP_ERR__TIMED_OUT);
/* Sleep so that the background EndTxn fails locally and sets
* an error result. */
rd_sleep(3);
error = rd_kafka_commit_transaction(rk, -1);
}
TEST_ASSERT(error != NULL && rd_kafka_error_is_fatal(error),
"Expected fatal error, not %s",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
"Expected error INVALID_TXN_STATE, got %s",
rd_kafka_error_name(error));
rd_kafka_error_destroy(error);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Concurrent transaction API calls are not permitted.
* This test makes sure they're properly enforced.
*
* For each transactional API, call it with a 5s timeout, and during that time
* from another thread call transactional APIs, one by one, and verify that
* we get an ERR__CONFLICT error back in the second thread.
*
* We use a mutex for synchronization, the main thread will hold the lock
* when not calling an API but release it just prior to calling.
* The other thread will acquire the lock, sleep, and hold the lock while
* calling the concurrent API that should fail immediately, releasing the lock
* when done.
*
*/
struct _txn_concurrent_state {
const char *api;
mtx_t lock;
rd_kafka_t *rk;
struct test *test;
};
static int txn_concurrent_thread_main(void *arg) {
struct _txn_concurrent_state *state = arg;
static const char *apis[] = {
"init_transactions", "begin_transaction",
"send_offsets_to_transaction", "commit_transaction",
"abort_transaction", NULL};
rd_kafka_t *rk = state->rk;
const char *main_api = NULL;
int i;
/* Update TLS variable so TEST_..() macros work */
test_curr = state->test;
while (1) {
const char *api = NULL;
const int timeout_ms = 10000;
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t exp_err;
test_timing_t duration;
/* Wait for other thread's txn call to start, then sleep a bit
* to increase the chance of that call has really begun. */
mtx_lock(&state->lock);
if (state->api && state->api == main_api) {
/* Main thread is still blocking on the last API call */
TEST_SAY("Waiting for main thread to finish %s()\n",
main_api);
mtx_unlock(&state->lock);
rd_sleep(1);
continue;
} else if (!(main_api = state->api)) {
mtx_unlock(&state->lock);
break;
}
rd_sleep(1);
for (i = 0; (api = apis[i]) != NULL; i++) {
TEST_SAY(
"Triggering concurrent %s() call while "
"main is in %s() call\n",
api, main_api);
TIMING_START(&duration, "%s", api);
if (!strcmp(api, "init_transactions"))
error =
rd_kafka_init_transactions(rk, timeout_ms);
else if (!strcmp(api, "begin_transaction"))
error = rd_kafka_begin_transaction(rk);
else if (!strcmp(api, "send_offsets_to_transaction")) {
rd_kafka_topic_partition_list_t *offsets =
rd_kafka_topic_partition_list_new(1);
rd_kafka_consumer_group_metadata_t *cgmetadata =
rd_kafka_consumer_group_metadata_new(
"mygroupid");
rd_kafka_topic_partition_list_add(
offsets, "srctopic4", 0)
->offset = 12;
error = rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, -1);
rd_kafka_consumer_group_metadata_destroy(
cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
} else if (!strcmp(api, "commit_transaction"))
error =
rd_kafka_commit_transaction(rk, timeout_ms);
else if (!strcmp(api, "abort_transaction"))
error =
rd_kafka_abort_transaction(rk, timeout_ms);
else
TEST_FAIL("Unknown API: %s", api);
TIMING_STOP(&duration);
TEST_SAY_ERROR(error, "Conflicting %s() call: ", api);
TEST_ASSERT(error,
"Expected conflicting %s() call to fail",
api);
exp_err = !strcmp(api, main_api)
? RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
: RD_KAFKA_RESP_ERR__CONFLICT;
TEST_ASSERT(rd_kafka_error_code(error) == exp_err,
"Conflicting %s(): Expected %s, not %s",
api, rd_kafka_err2str(exp_err),
rd_kafka_error_name(error));
TEST_ASSERT(
rd_kafka_error_is_retriable(error),
"Conflicting %s(): Expected retriable error", api);
rd_kafka_error_destroy(error);
/* These calls should fail immediately */
TIMING_ASSERT(&duration, 0, 100);
}
mtx_unlock(&state->lock);
}
return 0;
}
static void do_test_txn_concurrent_operations(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id = 1;
rd_kafka_resp_err_t err;
const char *topic = "test";
const char *transactional_id = "txnid";
int remains = 0;
thrd_t thrd;
struct _txn_concurrent_state state = RD_ZERO_INIT;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST("%s", do_commit ? "commit" : "abort");
test_timeout_set(90);
/* We need to override the value of socket.connection.setup.timeout.ms
* to be at least 2*RTT of the mock broker. This is because the first
* ApiVersion request will fail, since we make the request with v3, and
* the mock broker's MaxVersion is 2, so the request is retried with v0.
* We use the value 3*RTT to add some buffer.
*/
rk = create_txn_producer(&mcluster, transactional_id, 1,
"socket.connection.setup.timeout.ms", "15000",
NULL);
/* Set broker RTT to 3.5s so that the background thread has ample
* time to call its conflicting APIs.
* This value must be less than socket.connection.setup.timeout.ms/2. */
rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 3500);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
/* Set up shared state between us and the concurrent thread */
mtx_init(&state.lock, mtx_plain);
state.test = test_curr;
state.rk = rk;
/* We release the lock only while calling the TXN API */
mtx_lock(&state.lock);
/* Spin up concurrent thread */
if (thrd_create(&thrd, txn_concurrent_thread_main, (void *)&state) !=
thrd_success)
TEST_FAIL("Failed to create thread");
#define _start_call(callname) \
do { \
state.api = callname; \
mtx_unlock(&state.lock); \
} while (0)
#define _end_call() mtx_lock(&state.lock)
_start_call("init_transactions");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
_end_call();
/* This call doesn't block, so can't really be tested concurrently. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10,
NULL, 0, &remains);
_start_call("send_offsets_to_transaction");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
_end_call();
if (do_commit) {
_start_call("commit_transaction");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
_end_call();
} else {
_start_call("abort_transaction");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
_end_call();
}
/* Signal completion to background thread */
state.api = NULL;
mtx_unlock(&state.lock);
thrd_join(thrd, NULL);
rd_kafka_destroy(rk);
mtx_destroy(&state.lock);
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side abort of the
* transaction fail with a fencing error.
* Should raise a fatal error.
*
* @param error_code Which error code EndTxn should fail with.
* Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
* or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
*/
static void do_test_txn_fenced_abort(rd_kafka_resp_err_t error_code) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
char errstr[512];
rd_kafka_resp_err_t fatal_err;
size_t errors_cnt;
SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__FENCED;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Fail abort transaction */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_EndTxn, 1, error_code, 0);
/* Fail the PID reinit */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Abort the transaction, should fail with a fatal error */
error = rd_kafka_abort_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY_ERROR(error, "abort_transaction() failed: ");
TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
rd_kafka_error_destroy(error);
fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
/* Verify that the producer sent the expected number of EndTxn requests
* by inspecting the mock broker error stack,
* which should now be empty. */
if (rd_kafka_mock_broker_error_stack_cnt(
mcluster, txn_coord, RD_KAFKAP_EndTxn, &errors_cnt)) {
TEST_FAIL(
"Broker error count should succeed for API %s"
" on broker %" PRId32,
rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), txn_coord);
}
/* Checks all the RD_KAFKAP_EndTxn responses have been consumed */
TEST_ASSERT(errors_cnt == 0,
"Expected error count 0 for API %s, found %zu",
rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), errors_cnt);
if (rd_kafka_mock_broker_error_stack_cnt(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, &errors_cnt)) {
TEST_FAIL(
"Broker error count should succeed for API %s"
" on broker %" PRId32,
rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), txn_coord);
}
/* Checks none of the RD_KAFKAP_InitProducerId responses have been
* consumed
*/
TEST_ASSERT(errors_cnt == 1,
"Expected error count 1 for API %s, found %zu",
rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), errors_cnt);
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief Test that the TxnOffsetCommit op doesn't retry without waiting
* if the coordinator is found but not available, causing too frequent retries.
*/
static void
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_bool_t times_out) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
int timeout;
SUB_TEST_QUICK("times_out=%s", RD_STR_ToF(times_out));
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/*
* Fail TxnOffsetCommit with COORDINATOR_NOT_AVAILABLE
* repeatedly.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 4,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 1;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
/* The retry delay is 500ms, with 4 retries it should take at least
* 2000ms for this call to succeed. */
timeout = times_out ? 500 : 4000;
error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata,
timeout);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
if (times_out) {
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
"expected %s, got: %s",
rd_kafka_err2name(
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE),
rd_kafka_err2str(rd_kafka_error_code(error)));
} else {
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_NO_ERROR,
"expected \"Success\", found: %s",
rd_kafka_err2str(rd_kafka_error_code(error)));
}
rd_kafka_error_destroy(error);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
int main_0105_transactions_mock(int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
return 0;
}
do_test_txn_recoverable_errors();
do_test_txn_fatal_idempo_errors();
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
do_test_txn_req_cnt();
do_test_txn_requires_abort_errors();
do_test_txn_slow_reinit(rd_false);
do_test_txn_slow_reinit(rd_true);
/* Just do a subset of tests in quick mode */
if (test_quick)
return 0;
do_test_txn_endtxn_errors();
do_test_txn_endtxn_infinite();
do_test_txn_endtxn_timeout();
do_test_txn_endtxn_timeout_inflight();
/* Bring down the coordinator */
do_test_txn_broker_down_in_txn(rd_true);
/* Bring down partition leader */
do_test_txn_broker_down_in_txn(rd_false);
do_test_txns_not_supported();
do_test_txns_send_offsets_concurrent_is_retried();
do_test_txns_send_offsets_non_eligible();
do_test_txn_coord_req_destroy();
do_test_txn_coord_req_multi_find();
do_test_txn_addparts_req_multi();
do_test_txns_no_timeout_crash();
do_test_txn_auth_failure(
RD_KAFKAP_InitProducerId,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
do_test_txn_auth_failure(
RD_KAFKAP_FindCoordinator,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
do_test_txn_flush_timeout();
do_test_unstable_offset_commit();
do_test_commit_after_msg_timeout();
do_test_txn_switch_coordinator();
do_test_txn_switch_coordinator_refresh();
do_test_out_of_order_seq();
do_test_topic_disappears_for_awhile();
do_test_disconnected_group_coord(rd_false);
do_test_disconnected_group_coord(rd_true);
do_test_txn_coordinator_null_not_fatal();
do_test_txn_resumable_calls_timeout(rd_true);
do_test_txn_resumable_calls_timeout(rd_false);
do_test_txn_resumable_calls_timeout_error(rd_true);
do_test_txn_resumable_calls_timeout_error(rd_false);
do_test_txn_resumable_init();
do_test_txn_concurrent_operations(rd_true /*commit*/);
do_test_txn_concurrent_operations(rd_false /*abort*/);
do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_true);
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_false);
return 0;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。