代码拉取完成,页面将自动刷新
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2016-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "testcpp.h"
#if WITH_RAPIDJSON
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <assert.h>
#include <sstream>
#include <string>
#include <map>
#include <rapidjson/document.h>
#include <rapidjson/schema.h>
#include <rapidjson/filereadstream.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/error/en.h>
#include <rapidjson/prettywriter.h>
/**
* @name Consumer Transactions.
*
* - Uses the TransactionProducerCli Java application to produce messages
* that are part of abort and commit transactions in various combinations
* and tests that librdkafka consumes them as expected. Refer to
* TransactionProducerCli.java for scenarios covered.
*/
class TestEventCb : public RdKafka::EventCb {
public:
static bool should_capture_stats;
static bool has_captured_stats;
static int64_t partition_0_hi_offset;
static int64_t partition_0_ls_offset;
static std::string topic;
void event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_STATS:
if (should_capture_stats) {
partition_0_hi_offset = -1;
partition_0_ls_offset = -1;
has_captured_stats = true;
should_capture_stats = false;
char path[256];
/* Parse JSON to validate */
rapidjson::Document d;
if (d.Parse(event.str().c_str()).HasParseError())
Test::Fail(tostr() << "Failed to parse stats JSON: "
<< rapidjson::GetParseError_En(d.GetParseError())
<< " at " << d.GetErrorOffset());
rd_snprintf(path, sizeof(path), "/topics/%s/partitions/0",
topic.c_str());
rapidjson::Pointer jpath((const char *)path);
rapidjson::Value *pp = rapidjson::GetValueByPointer(d, jpath);
if (pp == NULL)
return;
TEST_ASSERT(pp->HasMember("hi_offset"), "hi_offset not found in stats");
TEST_ASSERT(pp->HasMember("ls_offset"), "ls_offset not found in stats");
partition_0_hi_offset = (*pp)["hi_offset"].GetInt();
partition_0_ls_offset = (*pp)["ls_offset"].GetInt();
}
break;
case RdKafka::Event::EVENT_LOG:
std::cerr << event.str() << "\n";
break;
default:
break;
}
}
};
bool TestEventCb::should_capture_stats;
bool TestEventCb::has_captured_stats;
int64_t TestEventCb::partition_0_hi_offset;
int64_t TestEventCb::partition_0_ls_offset;
std::string TestEventCb::topic;
static TestEventCb ex_event_cb;
static void execute_java_produce_cli(std::string &bootstrapServers,
const std::string &topic,
const std::string &testidstr,
const char **cmds,
size_t cmd_cnt) {
const std::string topicCmd = "topic," + topic;
const std::string testidCmd = "testid," + testidstr;
const char **argv;
size_t i = 0;
argv = (const char **)rd_alloca(sizeof(*argv) * (1 + 1 + 1 + cmd_cnt + 1));
argv[i++] = bootstrapServers.c_str();
argv[i++] = topicCmd.c_str();
argv[i++] = testidCmd.c_str();
for (size_t j = 0; j < cmd_cnt; j++)
argv[i++] = cmds[j];
argv[i] = NULL;
int pid = test_run_java("TransactionProducerCli", (const char **)argv);
test_waitpid(pid);
}
static std::vector<RdKafka::Message *>
consume_messages(RdKafka::KafkaConsumer *c, std::string topic, int partition) {
RdKafka::ErrorCode err;
/* Assign partitions */
std::vector<RdKafka::TopicPartition *> parts;
parts.push_back(RdKafka::TopicPartition::create(topic, partition));
if ((err = c->assign(parts)))
Test::Fail("assign failed: " + RdKafka::err2str(err));
RdKafka::TopicPartition::destroy(parts);
Test::Say(tostr() << "Consuming from topic " << topic << " partition "
<< partition << "\n");
std::vector<RdKafka::Message *> result = std::vector<RdKafka::Message *>();
while (true) {
RdKafka::Message *msg = c->consume(tmout_multip(1000));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
delete msg;
continue;
case RdKafka::ERR__PARTITION_EOF:
delete msg;
break;
case RdKafka::ERR_NO_ERROR:
result.push_back(msg);
continue;
default:
Test::Fail("Error consuming from topic " + topic + ": " + msg->errstr());
delete msg;
break;
}
break;
}
Test::Say("Read all messages from topic: " + topic + "\n");
TestEventCb::should_capture_stats = true;
/* rely on the test timeout to prevent an infinite loop in
* the (unlikely) event that the statistics callback isn't
* called. */
while (!TestEventCb::has_captured_stats) {
RdKafka::Message *msg = c->consume(tmout_multip(500));
delete msg;
}
Test::Say("Captured consumer statistics event\n");
return result;
}
static void delete_messages(std::vector<RdKafka::Message *> &messages) {
for (size_t i = 0; i < messages.size(); ++i)
delete messages[i];
}
static std::string get_bootstrap_servers() {
RdKafka::Conf *conf;
std::string bootstrap_servers;
Test::conf_init(&conf, NULL, 40);
conf->get("bootstrap.servers", bootstrap_servers);
delete conf;
return bootstrap_servers;
}
static RdKafka::KafkaConsumer *create_consumer(std::string &topic_name,
const char *isolation_level) {
RdKafka::Conf *conf;
std::string errstr;
Test::conf_init(&conf, NULL, 40);
Test::conf_set(conf, "group.id", topic_name);
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "auto.offset.reset", "earliest");
Test::conf_set(conf, "enable.partition.eof", "true");
Test::conf_set(conf, "isolation.level", isolation_level);
Test::conf_set(conf, "statistics.interval.ms", "1000");
conf->set("event_cb", &ex_event_cb, errstr);
TestEventCb::should_capture_stats = false;
TestEventCb::has_captured_stats = false;
RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
if (!c)
Test::Fail("Failed to create KafkaConsumer: " + errstr);
delete conf;
return c;
}
static std::vector<std::string> csv_split(const std::string &input) {
std::stringstream ss(input);
std::vector<std::string> res;
while (ss.good()) {
std::string substr;
std::getline(ss, substr, ',');
/* Trim */
substr.erase(0, substr.find_first_not_of(' '));
substr.erase(substr.find_last_not_of(' ') + 1);
res.push_back(substr);
}
return res;
}
enum TransactionType {
TransactionType_None,
TransactionType_BeginAbort,
TransactionType_BeginCommit,
TransactionType_BeginOpen,
TransactionType_ContinueAbort,
TransactionType_ContinueCommit,
TransactionType_ContinueOpen
};
static TransactionType TransactionType_from_string(std::string str) {
#define _CHKRET(NAME) \
if (!str.compare(#NAME)) \
return TransactionType_##NAME
_CHKRET(None);
_CHKRET(BeginAbort);
_CHKRET(BeginCommit);
_CHKRET(BeginOpen);
_CHKRET(ContinueAbort);
_CHKRET(ContinueCommit);
_CHKRET(ContinueOpen);
Test::Fail("Unknown TransactionType: " + str);
return TransactionType_None; /* NOTREACHED */
}
static void txn_producer_makeTestMessages(RdKafka::Producer *producer,
const std::string &topic,
const std::string &testidstr,
int partition,
int idStart,
int msgcount,
TransactionType tt,
bool do_flush) {
RdKafka::Error *error;
if (tt != TransactionType_None && tt != TransactionType_ContinueOpen &&
tt != TransactionType_ContinueCommit &&
tt != TransactionType_ContinueAbort) {
error = producer->begin_transaction();
if (error) {
Test::Fail("begin_transaction() failed: " + error->str());
delete error;
}
}
for (int i = 0; i < msgcount; i++) {
char key[] = {(char)((i + idStart) & 0xff)};
char payload[] = {0x10, 0x20, 0x30, 0x40};
RdKafka::ErrorCode err;
err = producer->produce(topic, partition, producer->RK_MSG_COPY, payload,
sizeof(payload), key, sizeof(key), 0, NULL);
if (err)
Test::Fail("produce() failed: " + RdKafka::err2str(err));
}
if (do_flush)
producer->flush(-1);
switch (tt) {
case TransactionType_BeginAbort:
case TransactionType_ContinueAbort:
error = producer->abort_transaction(30 * 1000);
if (error) {
Test::Fail("abort_transaction() failed: " + error->str());
delete error;
}
break;
case TransactionType_BeginCommit:
case TransactionType_ContinueCommit:
error = producer->commit_transaction(30 * 1000);
if (error) {
Test::Fail("commit_transaction() failed: " + error->str());
delete error;
}
break;
default:
break;
}
}
class txnDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &msg) {
switch (msg.err()) {
case RdKafka::ERR__PURGE_QUEUE:
case RdKafka::ERR__PURGE_INFLIGHT:
/* These are expected when transactions are aborted */
break;
case RdKafka::ERR_NO_ERROR:
break;
default:
Test::Fail("Delivery failed: " + msg.errstr());
break;
}
}
};
/**
* @brief Transactional producer, performing the commands in \p cmds.
* This is the librdkafka counterpart of
* java/TransactionProducerCli.java
*/
static void txn_producer(const std::string &brokers,
const std::string &topic,
const std::string &testidstr,
const char **cmds,
size_t cmd_cnt) {
RdKafka::Conf *conf;
txnDeliveryReportCb txn_dr;
Test::conf_init(&conf, NULL, 0);
Test::conf_set(conf, "bootstrap.servers", brokers);
std::map<std::string, RdKafka::Producer *> producers;
for (size_t i = 0; i < cmd_cnt; i++) {
std::string cmdstr = std::string(cmds[i]);
Test::Say(_C_CLR "rdkafka txn producer command: " + cmdstr + "\n");
std::vector<std::string> cmd = csv_split(cmdstr);
if (!cmd[0].compare("sleep")) {
rd_usleep(atoi(cmd[1].c_str()) * 1000, NULL);
} else if (!cmd[0].compare("exit")) {
break; /* We can't really simulate the Java exit behaviour
* from in-process. */
} else if (cmd[0].find("producer") == 0) {
TransactionType txntype = TransactionType_from_string(cmd[4]);
std::map<std::string, RdKafka::Producer *>::iterator it =
producers.find(cmd[0]);
RdKafka::Producer *producer;
if (it == producers.end()) {
/* Create producer if it doesn't exist */
std::string errstr;
Test::Say(tostr() << "Creating producer " << cmd[0]
<< " with transactiontype " << txntype << " '"
<< cmd[4] << "'\n");
/* Config */
Test::conf_set(conf, "enable.idempotence", "true");
if (txntype != TransactionType_None)
Test::conf_set(conf, "transactional.id",
"test-transactional-id-c-" + testidstr + "-" + cmd[0]);
else
Test::conf_set(conf, "transactional.id", "");
Test::conf_set(conf, "linger.ms", "5"); /* ensure batching */
conf->set("dr_cb", &txn_dr, errstr);
/* Create producer */
producer = RdKafka::Producer::create(conf, errstr);
if (!producer)
Test::Fail("Failed to create producer " + cmd[0] + ": " + errstr);
/* Init transactions if producer is transactional */
if (txntype != TransactionType_None) {
RdKafka::Error *error = producer->init_transactions(20 * 1000);
if (error) {
Test::Fail("init_transactions() failed: " + error->str());
delete error;
}
}
producers[cmd[0]] = producer;
} else {
producer = it->second;
}
txn_producer_makeTestMessages(
producer, /* producer */
topic, /* topic */
testidstr, /* testid */
atoi(cmd[1].c_str()), /* partition */
(int)strtol(cmd[2].c_str(), NULL, 0), /* idStart */
atoi(cmd[3].c_str()), /* msg count */
txntype, /* TransactionType */
!cmd[5].compare("DoFlush") /* Flush */);
} else {
Test::Fail("Unknown command: " + cmd[0]);
}
}
delete conf;
for (std::map<std::string, RdKafka::Producer *>::iterator it =
producers.begin();
it != producers.end(); it++)
delete it->second;
}
static void do_test_consumer_txn_test(bool use_java_producer) {
std::string errstr;
std::string topic_name;
RdKafka::KafkaConsumer *c;
std::vector<RdKafka::Message *> msgs;
std::string testidstr = test_str_id_generate_tmp();
std::string bootstrap_servers = get_bootstrap_servers();
Test::Say(tostr() << _C_BLU "[ Consumer transaction tests using "
<< (use_java_producer ? "java" : "librdkafka")
<< " producer with testid " << testidstr << "]\n" _C_CLR);
#define run_producer(CMDS...) \
do { \
const char *_cmds[] = {CMDS}; \
size_t _cmd_cnt = sizeof(_cmds) / sizeof(*_cmds); \
if (use_java_producer) \
execute_java_produce_cli(bootstrap_servers, topic_name, testidstr, \
_cmds, _cmd_cnt); \
else \
txn_producer(bootstrap_servers, topic_name, testidstr, _cmds, _cmd_cnt); \
} while (0)
if (test_quick) {
Test::Say("Skipping consumer_txn tests 0->4 due to quick mode\n");
goto test5;
}
Test::Say(_C_BLU "Test 0 - basic commit + abort\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x0, 5, BeginCommit, DoFlush",
"producer1, -1, 0x10, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
#define expect_msgcnt(msgcnt) \
TEST_ASSERT(msgs.size() == msgcnt, "Expected %d messages, got %d", \
(int)msgs.size(), msgcnt)
#define expect_key(msgidx, value) \
do { \
TEST_ASSERT(msgs.size() > msgidx, \
"Expected at least %d message(s), only got %d", msgidx + 1, \
(int)msgs.size()); \
TEST_ASSERT(msgs[msgidx]->key_len() == 1, \
"Expected msg #%d key to be of size 1, not %d\n", msgidx, \
(int)msgs[msgidx]->key_len()); \
TEST_ASSERT(value == (int)msgs[msgidx]->key()->c_str()[0], \
"Expected msg #%d key 0x%x, not 0x%x", msgidx, value, \
(int)msgs[msgidx]->key()->c_str()[0]); \
} while (0)
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
expect_msgcnt(10);
expect_key(0, 0x0);
expect_key(4, 0x4);
expect_key(5, 0x10);
expect_key(9, 0x14);
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 0.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x0, 5, BeginCommit, DontFlush",
"producer1, -1, 0x10, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x10 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x14 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 0.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 5, BeginAbort, DoFlush",
"producer1, -1, 0x30, 5, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x30 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x34 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1 - mixed with non-transactional.\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
TestEventCb::topic = topic_name;
run_producer("producer3, -1, 0x10, 5, None, DoFlush",
"producer1, -1, 0x50, 5, BeginCommit, DoFlush",
"producer1, -1, 0x80, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(TestEventCb::partition_0_ls_offset != -1 &&
TestEventCb::partition_0_ls_offset ==
TestEventCb::partition_0_hi_offset,
"Expected hi_offset to equal ls_offset but "
"got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
TestEventCb::partition_0_hi_offset,
TestEventCb::partition_0_ls_offset);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x50 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x54 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x30, 5, BeginAbort, DoFlush",
"producer3, -1, 0x40, 5, None, DoFlush",
"producer1, -1, 0x60, 5, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x40 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x44 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x60 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x64 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 5, BeginCommit, DoFlush",
"producer1, -1, 0x20, 5, BeginAbort, DoFlush",
"producer3, -1, 0x30, 5, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 2 - rapid abort / committing.\n" _C_CLR);
// note: aborted records never seem to make it to the broker when not flushed.
topic_name = Test::mk_topic_name("0098-consumer_txn-2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 1, BeginAbort, DontFlush",
"producer1, -1, 0x20, 1, BeginCommit, DontFlush",
"producer1, -1, 0x30, 1, BeginAbort, DontFlush",
"producer1, -1, 0x40, 1, BeginCommit, DontFlush",
"producer1, -1, 0x50, 1, BeginAbort, DontFlush",
"producer1, -1, 0x60, 1, BeginCommit, DontFlush",
"producer1, -1, 0x70, 1, BeginAbort, DontFlush",
"producer1, -1, 0x80, 1, BeginCommit, DontFlush",
"producer1, -1, 0x90, 1, BeginAbort, DontFlush",
"producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
"producer3, -1, 0xb0, 1, None, DontFlush",
"producer3, -1, 0xc0, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 2.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-2.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 1, BeginAbort, DoFlush",
"producer1, -1, 0x20, 1, BeginCommit, DoFlush",
"producer1, -1, 0x30, 1, BeginAbort, DoFlush",
"producer1, -1, 0x40, 1, BeginCommit, DoFlush",
"producer1, -1, 0x50, 1, BeginAbort, DoFlush",
"producer1, -1, 0x60, 1, BeginCommit, DoFlush",
"producer1, -1, 0x70, 1, BeginAbort, DoFlush",
"producer1, -1, 0x80, 1, BeginCommit, DoFlush",
"producer1, -1, 0x90, 1, BeginAbort, DoFlush",
"producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
"producer3, -1, 0xb0, 1, None, DoFlush",
"producer3, -1, 0xc0, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 12,
"Consumed unexpected number of messages. "
"Expected 12, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x10 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x30 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0x50 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0x70 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 3 - cross partition (simple).\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-3", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 2, 3);
run_producer("producer1, 0, 0x10, 3, BeginOpen, DoFlush",
"producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
"producer1, 0, 0x30, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 6,
"Consumed unexpected number of messages. "
"Expected 6, got: %d",
(int)msgs.size());
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 3,
"Consumed unexpected number of messages. "
"Expected 3, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 6,
"Consumed unexpected number of messages. "
"Expected 6, got: %d",
(int)msgs.size());
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 3,
"Consumed unexpected number of messages. "
"Expected 3, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 3.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-3.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 2, 3);
run_producer("producer1, 0, 0x55, 1, BeginCommit, DoFlush",
"producer1, 0, 0x10, 3, BeginOpen, DoFlush",
"producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
"producer1, 0, 0x30, 3, ContinueAbort, DoFlush",
"producer3, 0, 0x00, 1, None, DoFlush",
"producer1, 1, 0x44, 1, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 2,
"Consumed unexpected number of messages. "
"Expected 2, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x55 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x00 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 1, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x44 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4 - simultaneous transactions (simple).\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
"producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
"producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
"producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.3\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.3", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
"producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 5 - split transaction across message sets.\n" _C_CLR);
test5:
topic_name = Test::mk_topic_name("0098-consumer_txn-5", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, 0, 0x10, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x20, 2, ContinueAbort, DontFlush",
"producer1, 0, 0x30, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x40, 2, ContinueCommit, DontFlush",
"producer1, 0, 0x50, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x60, 2, ContinueAbort, DontFlush",
"producer1, 0, 0xa0, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0xb0, 2, ContinueCommit, DontFlush",
"producer3, 0, 0x70, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 9,
"Consumed unexpected number of messages. "
"Expected 9, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x30 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x31 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x41 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xa1 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[7]->key_len() >= 1 &&
0xb1 == (unsigned char)msgs[7]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[8]->key_len() >= 1 &&
0x70 == (unsigned char)msgs[8]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 6 - transaction left open\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
TestEventCb::topic = topic_name;
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
// prevent abort control message from being written.
"exit,0");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 1, got: %d",
(int)msgs.size());
TEST_ASSERT(TestEventCb::partition_0_ls_offset + 3 ==
TestEventCb::partition_0_hi_offset,
"Expected hi_offset to be 3 greater than ls_offset "
"but got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
TestEventCb::partition_0_hi_offset,
TestEventCb::partition_0_ls_offset);
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
}
#endif
extern "C" {
int main_0098_consumer_txn(int argc, char **argv) {
if (test_needs_auth()) {
Test::Skip(
"Authentication or security configuration "
"required on client: not supported in "
"Java transactional producer: skipping tests\n");
return 0;
}
#if WITH_RAPIDJSON
do_test_consumer_txn_test(true /* with java producer */);
do_test_consumer_txn_test(false /* with librdkafka producer */);
#else
Test::Skip("RapidJSON >=1.1.0 not available\n");
#endif
return 0;
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。