1 Star 0 Fork 0

Kenvins/librdkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
0098-consumer-txn.cpp 41.97 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2016-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "testcpp.h"
#if WITH_RAPIDJSON
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <assert.h>
#include <sstream>
#include <string>
#include <map>
#include <rapidjson/document.h>
#include <rapidjson/schema.h>
#include <rapidjson/filereadstream.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/error/en.h>
#include <rapidjson/prettywriter.h>
/**
* @name Consumer Transactions.
*
* - Uses the TransactionProducerCli Java application to produce messages
* that are part of abort and commit transactions in various combinations
* and tests that librdkafka consumes them as expected. Refer to
* TransactionProducerCli.java for scenarios covered.
*/
class TestEventCb : public RdKafka::EventCb {
public:
static bool should_capture_stats;
static bool has_captured_stats;
static int64_t partition_0_hi_offset;
static int64_t partition_0_ls_offset;
static std::string topic;
void event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_STATS:
if (should_capture_stats) {
partition_0_hi_offset = -1;
partition_0_ls_offset = -1;
has_captured_stats = true;
should_capture_stats = false;
char path[256];
/* Parse JSON to validate */
rapidjson::Document d;
if (d.Parse(event.str().c_str()).HasParseError())
Test::Fail(tostr() << "Failed to parse stats JSON: "
<< rapidjson::GetParseError_En(d.GetParseError())
<< " at " << d.GetErrorOffset());
rd_snprintf(path, sizeof(path), "/topics/%s/partitions/0",
topic.c_str());
rapidjson::Pointer jpath((const char *)path);
rapidjson::Value *pp = rapidjson::GetValueByPointer(d, jpath);
if (pp == NULL)
return;
TEST_ASSERT(pp->HasMember("hi_offset"), "hi_offset not found in stats");
TEST_ASSERT(pp->HasMember("ls_offset"), "ls_offset not found in stats");
partition_0_hi_offset = (*pp)["hi_offset"].GetInt();
partition_0_ls_offset = (*pp)["ls_offset"].GetInt();
}
break;
case RdKafka::Event::EVENT_LOG:
std::cerr << event.str() << "\n";
break;
default:
break;
}
}
};
bool TestEventCb::should_capture_stats;
bool TestEventCb::has_captured_stats;
int64_t TestEventCb::partition_0_hi_offset;
int64_t TestEventCb::partition_0_ls_offset;
std::string TestEventCb::topic;
static TestEventCb ex_event_cb;
static void execute_java_produce_cli(std::string &bootstrapServers,
const std::string &topic,
const std::string &testidstr,
const char **cmds,
size_t cmd_cnt) {
const std::string topicCmd = "topic," + topic;
const std::string testidCmd = "testid," + testidstr;
const char **argv;
size_t i = 0;
argv = (const char **)rd_alloca(sizeof(*argv) * (1 + 1 + 1 + cmd_cnt + 1));
argv[i++] = bootstrapServers.c_str();
argv[i++] = topicCmd.c_str();
argv[i++] = testidCmd.c_str();
for (size_t j = 0; j < cmd_cnt; j++)
argv[i++] = cmds[j];
argv[i] = NULL;
int pid = test_run_java("TransactionProducerCli", (const char **)argv);
test_waitpid(pid);
}
static std::vector<RdKafka::Message *>
consume_messages(RdKafka::KafkaConsumer *c, std::string topic, int partition) {
RdKafka::ErrorCode err;
/* Assign partitions */
std::vector<RdKafka::TopicPartition *> parts;
parts.push_back(RdKafka::TopicPartition::create(topic, partition));
if ((err = c->assign(parts)))
Test::Fail("assign failed: " + RdKafka::err2str(err));
RdKafka::TopicPartition::destroy(parts);
Test::Say(tostr() << "Consuming from topic " << topic << " partition "
<< partition << "\n");
std::vector<RdKafka::Message *> result = std::vector<RdKafka::Message *>();
while (true) {
RdKafka::Message *msg = c->consume(tmout_multip(1000));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
delete msg;
continue;
case RdKafka::ERR__PARTITION_EOF:
delete msg;
break;
case RdKafka::ERR_NO_ERROR:
result.push_back(msg);
continue;
default:
Test::Fail("Error consuming from topic " + topic + ": " + msg->errstr());
delete msg;
break;
}
break;
}
Test::Say("Read all messages from topic: " + topic + "\n");
TestEventCb::should_capture_stats = true;
/* rely on the test timeout to prevent an infinite loop in
* the (unlikely) event that the statistics callback isn't
* called. */
while (!TestEventCb::has_captured_stats) {
RdKafka::Message *msg = c->consume(tmout_multip(500));
delete msg;
}
Test::Say("Captured consumer statistics event\n");
return result;
}
static void delete_messages(std::vector<RdKafka::Message *> &messages) {
for (size_t i = 0; i < messages.size(); ++i)
delete messages[i];
}
static std::string get_bootstrap_servers() {
RdKafka::Conf *conf;
std::string bootstrap_servers;
Test::conf_init(&conf, NULL, 40);
conf->get("bootstrap.servers", bootstrap_servers);
delete conf;
return bootstrap_servers;
}
static RdKafka::KafkaConsumer *create_consumer(std::string &topic_name,
const char *isolation_level) {
RdKafka::Conf *conf;
std::string errstr;
Test::conf_init(&conf, NULL, 40);
Test::conf_set(conf, "group.id", topic_name);
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "auto.offset.reset", "earliest");
Test::conf_set(conf, "enable.partition.eof", "true");
Test::conf_set(conf, "isolation.level", isolation_level);
Test::conf_set(conf, "statistics.interval.ms", "1000");
conf->set("event_cb", &ex_event_cb, errstr);
TestEventCb::should_capture_stats = false;
TestEventCb::has_captured_stats = false;
RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
if (!c)
Test::Fail("Failed to create KafkaConsumer: " + errstr);
delete conf;
return c;
}
static std::vector<std::string> csv_split(const std::string &input) {
std::stringstream ss(input);
std::vector<std::string> res;
while (ss.good()) {
std::string substr;
std::getline(ss, substr, ',');
/* Trim */
substr.erase(0, substr.find_first_not_of(' '));
substr.erase(substr.find_last_not_of(' ') + 1);
res.push_back(substr);
}
return res;
}
enum TransactionType {
TransactionType_None,
TransactionType_BeginAbort,
TransactionType_BeginCommit,
TransactionType_BeginOpen,
TransactionType_ContinueAbort,
TransactionType_ContinueCommit,
TransactionType_ContinueOpen
};
static TransactionType TransactionType_from_string(std::string str) {
#define _CHKRET(NAME) \
if (!str.compare(#NAME)) \
return TransactionType_##NAME
_CHKRET(None);
_CHKRET(BeginAbort);
_CHKRET(BeginCommit);
_CHKRET(BeginOpen);
_CHKRET(ContinueAbort);
_CHKRET(ContinueCommit);
_CHKRET(ContinueOpen);
Test::Fail("Unknown TransactionType: " + str);
return TransactionType_None; /* NOTREACHED */
}
static void txn_producer_makeTestMessages(RdKafka::Producer *producer,
const std::string &topic,
const std::string &testidstr,
int partition,
int idStart,
int msgcount,
TransactionType tt,
bool do_flush) {
RdKafka::Error *error;
if (tt != TransactionType_None && tt != TransactionType_ContinueOpen &&
tt != TransactionType_ContinueCommit &&
tt != TransactionType_ContinueAbort) {
error = producer->begin_transaction();
if (error) {
Test::Fail("begin_transaction() failed: " + error->str());
delete error;
}
}
for (int i = 0; i < msgcount; i++) {
char key[] = {(char)((i + idStart) & 0xff)};
char payload[] = {0x10, 0x20, 0x30, 0x40};
RdKafka::ErrorCode err;
err = producer->produce(topic, partition, producer->RK_MSG_COPY, payload,
sizeof(payload), key, sizeof(key), 0, NULL);
if (err)
Test::Fail("produce() failed: " + RdKafka::err2str(err));
}
if (do_flush)
producer->flush(-1);
switch (tt) {
case TransactionType_BeginAbort:
case TransactionType_ContinueAbort:
error = producer->abort_transaction(30 * 1000);
if (error) {
Test::Fail("abort_transaction() failed: " + error->str());
delete error;
}
break;
case TransactionType_BeginCommit:
case TransactionType_ContinueCommit:
error = producer->commit_transaction(30 * 1000);
if (error) {
Test::Fail("commit_transaction() failed: " + error->str());
delete error;
}
break;
default:
break;
}
}
class txnDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &msg) {
switch (msg.err()) {
case RdKafka::ERR__PURGE_QUEUE:
case RdKafka::ERR__PURGE_INFLIGHT:
/* These are expected when transactions are aborted */
break;
case RdKafka::ERR_NO_ERROR:
break;
default:
Test::Fail("Delivery failed: " + msg.errstr());
break;
}
}
};
/**
* @brief Transactional producer, performing the commands in \p cmds.
* This is the librdkafka counterpart of
* java/TransactionProducerCli.java
*/
static void txn_producer(const std::string &brokers,
const std::string &topic,
const std::string &testidstr,
const char **cmds,
size_t cmd_cnt) {
RdKafka::Conf *conf;
txnDeliveryReportCb txn_dr;
Test::conf_init(&conf, NULL, 0);
Test::conf_set(conf, "bootstrap.servers", brokers);
std::map<std::string, RdKafka::Producer *> producers;
for (size_t i = 0; i < cmd_cnt; i++) {
std::string cmdstr = std::string(cmds[i]);
Test::Say(_C_CLR "rdkafka txn producer command: " + cmdstr + "\n");
std::vector<std::string> cmd = csv_split(cmdstr);
if (!cmd[0].compare("sleep")) {
rd_usleep(atoi(cmd[1].c_str()) * 1000, NULL);
} else if (!cmd[0].compare("exit")) {
break; /* We can't really simulate the Java exit behaviour
* from in-process. */
} else if (cmd[0].find("producer") == 0) {
TransactionType txntype = TransactionType_from_string(cmd[4]);
std::map<std::string, RdKafka::Producer *>::iterator it =
producers.find(cmd[0]);
RdKafka::Producer *producer;
if (it == producers.end()) {
/* Create producer if it doesn't exist */
std::string errstr;
Test::Say(tostr() << "Creating producer " << cmd[0]
<< " with transactiontype " << txntype << " '"
<< cmd[4] << "'\n");
/* Config */
Test::conf_set(conf, "enable.idempotence", "true");
if (txntype != TransactionType_None)
Test::conf_set(conf, "transactional.id",
"test-transactional-id-c-" + testidstr + "-" + cmd[0]);
else
Test::conf_set(conf, "transactional.id", "");
Test::conf_set(conf, "linger.ms", "5"); /* ensure batching */
conf->set("dr_cb", &txn_dr, errstr);
/* Create producer */
producer = RdKafka::Producer::create(conf, errstr);
if (!producer)
Test::Fail("Failed to create producer " + cmd[0] + ": " + errstr);
/* Init transactions if producer is transactional */
if (txntype != TransactionType_None) {
RdKafka::Error *error = producer->init_transactions(20 * 1000);
if (error) {
Test::Fail("init_transactions() failed: " + error->str());
delete error;
}
}
producers[cmd[0]] = producer;
} else {
producer = it->second;
}
txn_producer_makeTestMessages(
producer, /* producer */
topic, /* topic */
testidstr, /* testid */
atoi(cmd[1].c_str()), /* partition */
(int)strtol(cmd[2].c_str(), NULL, 0), /* idStart */
atoi(cmd[3].c_str()), /* msg count */
txntype, /* TransactionType */
!cmd[5].compare("DoFlush") /* Flush */);
} else {
Test::Fail("Unknown command: " + cmd[0]);
}
}
delete conf;
for (std::map<std::string, RdKafka::Producer *>::iterator it =
producers.begin();
it != producers.end(); it++)
delete it->second;
}
static void do_test_consumer_txn_test(bool use_java_producer) {
std::string errstr;
std::string topic_name;
RdKafka::KafkaConsumer *c;
std::vector<RdKafka::Message *> msgs;
std::string testidstr = test_str_id_generate_tmp();
std::string bootstrap_servers = get_bootstrap_servers();
Test::Say(tostr() << _C_BLU "[ Consumer transaction tests using "
<< (use_java_producer ? "java" : "librdkafka")
<< " producer with testid " << testidstr << "]\n" _C_CLR);
#define run_producer(CMDS...) \
do { \
const char *_cmds[] = {CMDS}; \
size_t _cmd_cnt = sizeof(_cmds) / sizeof(*_cmds); \
if (use_java_producer) \
execute_java_produce_cli(bootstrap_servers, topic_name, testidstr, \
_cmds, _cmd_cnt); \
else \
txn_producer(bootstrap_servers, topic_name, testidstr, _cmds, _cmd_cnt); \
} while (0)
if (test_quick) {
Test::Say("Skipping consumer_txn tests 0->4 due to quick mode\n");
goto test5;
}
Test::Say(_C_BLU "Test 0 - basic commit + abort\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x0, 5, BeginCommit, DoFlush",
"producer1, -1, 0x10, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
#define expect_msgcnt(msgcnt) \
TEST_ASSERT(msgs.size() == msgcnt, "Expected %d messages, got %d", \
(int)msgs.size(), msgcnt)
#define expect_key(msgidx, value) \
do { \
TEST_ASSERT(msgs.size() > msgidx, \
"Expected at least %d message(s), only got %d", msgidx + 1, \
(int)msgs.size()); \
TEST_ASSERT(msgs[msgidx]->key_len() == 1, \
"Expected msg #%d key to be of size 1, not %d\n", msgidx, \
(int)msgs[msgidx]->key_len()); \
TEST_ASSERT(value == (int)msgs[msgidx]->key()->c_str()[0], \
"Expected msg #%d key 0x%x, not 0x%x", msgidx, value, \
(int)msgs[msgidx]->key()->c_str()[0]); \
} while (0)
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
expect_msgcnt(10);
expect_key(0, 0x0);
expect_key(4, 0x4);
expect_key(5, 0x10);
expect_key(9, 0x14);
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 0.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x0, 5, BeginCommit, DontFlush",
"producer1, -1, 0x10, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x10 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x14 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 0.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 5, BeginAbort, DoFlush",
"producer1, -1, 0x30, 5, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 5,
"Consumed unexpected number of messages. "
"Expected 5, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x30 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x34 == msgs[4]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1 - mixed with non-transactional.\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
TestEventCb::topic = topic_name;
run_producer("producer3, -1, 0x10, 5, None, DoFlush",
"producer1, -1, 0x50, 5, BeginCommit, DoFlush",
"producer1, -1, 0x80, 5, BeginAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(TestEventCb::partition_0_ls_offset != -1 &&
TestEventCb::partition_0_ls_offset ==
TestEventCb::partition_0_hi_offset,
"Expected hi_offset to equal ls_offset but "
"got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
TestEventCb::partition_0_hi_offset,
TestEventCb::partition_0_ls_offset);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x50 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x54 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x30, 5, BeginAbort, DoFlush",
"producer3, -1, 0x40, 5, None, DoFlush",
"producer1, -1, 0x60, 5, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x40 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x44 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x60 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x64 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 1.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-1.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 5, BeginCommit, DoFlush",
"producer1, -1, 0x20, 5, BeginAbort, DoFlush",
"producer3, -1, 0x30, 5, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 10,
"Consumed unexpected number of messages. "
"Expected 10, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 2 - rapid abort / committing.\n" _C_CLR);
// note: aborted records never seem to make it to the broker when not flushed.
topic_name = Test::mk_topic_name("0098-consumer_txn-2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 1, BeginAbort, DontFlush",
"producer1, -1, 0x20, 1, BeginCommit, DontFlush",
"producer1, -1, 0x30, 1, BeginAbort, DontFlush",
"producer1, -1, 0x40, 1, BeginCommit, DontFlush",
"producer1, -1, 0x50, 1, BeginAbort, DontFlush",
"producer1, -1, 0x60, 1, BeginCommit, DontFlush",
"producer1, -1, 0x70, 1, BeginAbort, DontFlush",
"producer1, -1, 0x80, 1, BeginCommit, DontFlush",
"producer1, -1, 0x90, 1, BeginAbort, DontFlush",
"producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
"producer3, -1, 0xb0, 1, None, DontFlush",
"producer3, -1, 0xc0, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 2.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-2.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, -1, 0x10, 1, BeginAbort, DoFlush",
"producer1, -1, 0x20, 1, BeginCommit, DoFlush",
"producer1, -1, 0x30, 1, BeginAbort, DoFlush",
"producer1, -1, 0x40, 1, BeginCommit, DoFlush",
"producer1, -1, 0x50, 1, BeginAbort, DoFlush",
"producer1, -1, 0x60, 1, BeginCommit, DoFlush",
"producer1, -1, 0x70, 1, BeginAbort, DoFlush",
"producer1, -1, 0x80, 1, BeginCommit, DoFlush",
"producer1, -1, 0x90, 1, BeginAbort, DoFlush",
"producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
"producer3, -1, 0xb0, 1, None, DoFlush",
"producer3, -1, 0xc0, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 12,
"Consumed unexpected number of messages. "
"Expected 12, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x10 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x20 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x30 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0x50 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0x60 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0x70 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 3 - cross partition (simple).\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-3", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 2, 3);
run_producer("producer1, 0, 0x10, 3, BeginOpen, DoFlush",
"producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
"producer1, 0, 0x30, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 6,
"Consumed unexpected number of messages. "
"Expected 6, got: %d",
(int)msgs.size());
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 3,
"Consumed unexpected number of messages. "
"Expected 3, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 6,
"Consumed unexpected number of messages. "
"Expected 6, got: %d",
(int)msgs.size());
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 3,
"Consumed unexpected number of messages. "
"Expected 3, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 3.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-3.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 2, 3);
run_producer("producer1, 0, 0x55, 1, BeginCommit, DoFlush",
"producer1, 0, 0x10, 3, BeginOpen, DoFlush",
"producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
"producer1, 0, 0x30, 3, ContinueAbort, DoFlush",
"producer3, 0, 0x00, 1, None, DoFlush",
"producer1, 1, 0x44, 1, BeginCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 2,
"Consumed unexpected number of messages. "
"Expected 2, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x55 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x00 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
msgs = consume_messages(c, topic_name, 1);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 1, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x44 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4 - simultaneous transactions (simple).\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
"producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.1\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.1", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
"producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 7,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.2\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.2", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
"producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 4.3\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-4.3", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
"producer2, 0, 0x30, 3, BeginOpen, DoFlush",
"producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
"producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 7, got: %d",
(int)msgs.size());
delete_messages(msgs);
c->close();
delete c;
c = create_consumer(topic_name, "READ_UNCOMMITTED");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 13,
"Consumed unexpected number of messages. "
"Expected 13, got: %d",
(int)msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 5 - split transaction across message sets.\n" _C_CLR);
test5:
topic_name = Test::mk_topic_name("0098-consumer_txn-5", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
run_producer("producer1, 0, 0x10, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x20, 2, ContinueAbort, DontFlush",
"producer1, 0, 0x30, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x40, 2, ContinueCommit, DontFlush",
"producer1, 0, 0x50, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0x60, 2, ContinueAbort, DontFlush",
"producer1, 0, 0xa0, 2, BeginOpen, DontFlush", "sleep,200",
"producer1, 0, 0xb0, 2, ContinueCommit, DontFlush",
"producer3, 0, 0x70, 1, None, DoFlush");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 9,
"Consumed unexpected number of messages. "
"Expected 9, got: %d",
(int)msgs.size());
TEST_ASSERT(msgs[0]->key_len() >= 1 &&
0x30 == (unsigned char)msgs[0]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[1]->key_len() >= 1 &&
0x31 == (unsigned char)msgs[1]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[2]->key_len() >= 1 &&
0x40 == (unsigned char)msgs[2]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[3]->key_len() >= 1 &&
0x41 == (unsigned char)msgs[3]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[4]->key_len() >= 1 &&
0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[5]->key_len() >= 1 &&
0xa1 == (unsigned char)msgs[5]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[6]->key_len() >= 1 &&
0xb0 == (unsigned char)msgs[6]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[7]->key_len() >= 1 &&
0xb1 == (unsigned char)msgs[7]->key()->c_str()[0],
"Unexpected key");
TEST_ASSERT(msgs[8]->key_len() >= 1 &&
0x70 == (unsigned char)msgs[8]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
Test::Say(_C_BLU "Test 6 - transaction left open\n" _C_CLR);
topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
c = create_consumer(topic_name, "READ_COMMITTED");
Test::create_topic(c, topic_name.c_str(), 1, 3);
TestEventCb::topic = topic_name;
run_producer("producer3, 0, 0x10, 1, None, DoFlush",
"producer1, 0, 0x20, 3, BeginOpen, DoFlush",
// prevent abort control message from being written.
"exit,0");
msgs = consume_messages(c, topic_name, 0);
TEST_ASSERT(msgs.size() == 1,
"Consumed unexpected number of messages. "
"Expected 1, got: %d",
(int)msgs.size());
TEST_ASSERT(TestEventCb::partition_0_ls_offset + 3 ==
TestEventCb::partition_0_hi_offset,
"Expected hi_offset to be 3 greater than ls_offset "
"but got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
TestEventCb::partition_0_hi_offset,
TestEventCb::partition_0_ls_offset);
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
delete c;
}
#endif
extern "C" {
int main_0098_consumer_txn(int argc, char **argv) {
if (test_needs_auth()) {
Test::Skip(
"Authentication or security configuration "
"required on client: not supported in "
"Java transactional producer: skipping tests\n");
return 0;
}
#if WITH_RAPIDJSON
do_test_consumer_txn_test(true /* with java producer */);
do_test_consumer_txn_test(false /* with librdkafka producer */);
#else
Test::Skip("RapidJSON >=1.1.0 not available\n");
#endif
return 0;
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kenvins/librdkafka.git
git@gitee.com:kenvins/librdkafka.git
kenvins
librdkafka
librdkafka
master

搜索帮助