1 Star 0 Fork 0

Kenvins/librdkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
.github
.semaphore
debian
examples
mklove
packaging
src-cpp
src
tests
fixtures/ssl
fuzzers
interceptor_test
java
plugin_test
scenarios
tools
trivup
.gitignore
0000-unittests.c
0001-multiobj.c
0002-unkpart.c
0003-msgmaxsize.c
0004-conf.c
0005-order.c
0006-symbols.c
0007-autotopic.c
0008-reqacks.c
0009-mock_cluster.c
0011-produce_batch.c
0012-produce_consume.c
0013-null-msgs.c
0014-reconsume-191.c
0015-offset_seeks.c
0016-client_swname.c
0017-compression.c
0018-cgrp_term.c
0019-list_groups.c
0020-destroy_hang.c
0021-rkt_destroy.c
0022-consume_batch.c
0025-timers.c
0026-consume_pause.c
0028-long_topicnames.c
0029-assign_offset.c
0030-offset_commit.c
0031-get_offsets.c
0033-regex_subscribe.c
0034-offset_reset.c
0035-api_version.c
0036-partial_fetch.c
0037-destroy_hang_local.c
0038-performance.c
0039-event.c
0040-io_event.c
0041-fetch_max_bytes.c
0042-many_topics.c
0043-no_connection.c
0044-partition_cnt.c
0045-subscribe_update.c
0046-rkt_cache.c
0047-partial_buf_tmout.c
0048-partitioner.c
0049-consume_conn_close.c
0050-subscribe_adds.c
0051-assign_adds.c
0052-msg_timestamps.c
0053-stats_cb.cpp
0054-offset_time.cpp
0055-producer_latency.c
0056-balanced_group_mt.c
0057-invalid_topic.cpp
0058-log.cpp
0059-bsearch.cpp
0060-op_prio.cpp
0061-consumer_lag.cpp
0062-stats_event.c
0063-clusterid.cpp
0064-interceptors.c
0065-yield.cpp
0066-plugins.cpp
0067-empty_topic.cpp
0068-produce_timeout.c
0069-consumer_add_parts.c
0070-null_empty.cpp
0072-headers_ut.c
0073-headers.c
0074-producev.c
0075-retry.c
0076-produce_retry.c
0077-compaction.c
0078-c_from_cpp.cpp
0079-fork.c
0080-admin_ut.c
0081-admin.c
0082-fetch_max_bytes.cpp
0083-cb_event.c
0084-destroy_flags.c
0085-headers.cpp
0086-purge.c
0088-produce_metadata_timeout.c
0089-max_poll_interval.c
0090-idempotence.c
0091-max_poll_interval_timeout.c
0092-mixed_msgver.c
0093-holb.c
0094-idempotence_msg_timeout.c
0095-all_brokers_down.cpp
0097-ssl_verify.cpp
0098-consumer-txn.cpp
0099-commit_metadata.c
0100-thread_interceptors.cpp
0101-fetch-from-follower.cpp
0102-static_group_rebalance.c
0103-transactions.c
0104-fetch_from_follower_mock.c
0105-transactions_mock.c
0106-cgrp_sess_timeout.c
0107-topic_recreate.c
0109-auto_create_topics.cpp
0110-batch_size.cpp
0111-delay_create_topics.cpp
0112-assign_unknown_part.c
0113-cooperative_rebalance.cpp
0114-sticky_partitioning.cpp
0115-producer_auth.cpp
0116-kafkaconsumer_close.cpp
0117-mock_errors.c
0118-commit_rebalance.c
0119-consumer_auth.cpp
0120-asymmetric_subscription.c
0121-clusterid.c
0122-buffer_cleaning_after_rebalance.c
0123-connections_max_idle.c
0124-openssl_invalid_engine.c
0125-immediate_flush.c
0126-oauthbearer_oidc.c
0127-fetch_queue_backoff.cpp
0128-sasl_callback_queue.cpp
0129-fetch_aborted_msgs.c
0130-store_offsets.c
0131-connect_timeout.c
0132-strategy_ordering.c
0133-ssl_keys.c
0134-ssl_provider.c
0135-sasl_credentials.cpp
0136-resolve_cb.c
0137-barrier_batch_consume.c
0138-admin_mock.c
0139-offset_validation_mock.c
0140-commit_metadata.cpp
0142-reauthentication.c
0143-exponential_backoff_mock.c
0144-idempotence_mock.c
1000-unktopic.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
CMakeLists.txt
LibrdkafkaTestApp.py
Makefile
README.md
autotest.sh
backtrace.gdb
broker_version_tests.py
buildbox.sh
cleanup-checker-tests.sh
cluster_testing.py
delete-test-topics.sh
gen-ssl-certs.sh
interactive_broker_version.py
librdkafka.suppressions
lz4_manual_test.sh
multi-broker-version-test.sh
parse-refcnt.sh
performance_plot.py
requirements.txt
run-consumer-tests.sh
run-producer-tests.sh
run-test.sh
rusage.c
sasl_test.py
sockem.c
sockem.h
sockem_ctrl.c
sockem_ctrl.h
test.c
test.conf.example
test.h
testcpp.cpp
testcpp.h
testshared.h
until-fail.sh
xxxx-assign_partition.c
xxxx-metadata.cpp
win32
.clang-format
.clang-format-cpp
.dir-locals.el
.formatignore
.gdbmacros
.gitignore
CHANGELOG.md
CMakeLists.txt
CODE_OF_CONDUCT.md
CONFIGURATION.md
CONTRIBUTING.md
Doxyfile
INTRODUCTION.md
LICENSE
LICENSE.cjson
LICENSE.crc32c
LICENSE.fnv1a
LICENSE.hdrhistogram
LICENSE.lz4
LICENSE.murmur2
LICENSE.pycrc
LICENSE.queue
LICENSE.regexp
LICENSE.snappy
LICENSE.tinycthread
LICENSE.wingetopt
LICENSES.txt
Makefile
README.md
README.win32
STATISTICS.md
configure
configure.self
dev-conf.sh
lds-gen.py
mainpage.doxy
service.yml
vcpkg.json
克隆/下载
0105-transactions_mock.c 147.72 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
#include "rdkafka.h"
#include "../src/rdkafka_proto.h"
#include "../src/rdstring.h"
#include "../src/rdunittest.h"
#include <stdarg.h>
/**
* @name Producer transaction tests using the mock cluster
*
*/
static int allowed_error;
/**
* @brief Decide what error_cb's will cause the test to fail.
*/
static int
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
if (err == allowed_error ||
/* If transport errors are allowed then it is likely
* that we'll also see ALL_BROKERS_DOWN. */
(allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
TEST_SAY("Ignoring allowed error: %s: %s\n",
rd_kafka_err2name(err), reason);
return 0;
}
return 1;
}
static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque);
/**
* @brief Simple on_response_received interceptor that simply calls the
* sub-test's on_response_received_cb function, if set.
*/
static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
TEST_ASSERT(on_response_received_cb != NULL, "");
return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey,
ApiVersion, CorrId, size, rtt, err,
ic_opaque);
}
/**
* @brief on_new interceptor to add an on_response_received interceptor.
*/
static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
const rd_kafka_conf_t *conf,
void *ic_opaque,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
if (on_response_received_cb)
err = rd_kafka_interceptor_add_on_response_received(
rk, "on_response_received", on_response_received_trampoline,
ic_opaque);
return err;
}
/**
* @brief Create a transactional producer and a mock cluster.
*
* The var-arg list is a NULL-terminated list of
* (const char *key, const char *value) config properties.
*
* Special keys:
* "on_response_received", "" - enable the on_response_received_cb
* interceptor,
* which must be assigned prior to
* calling create_tnx_producer().
*/
static RD_SENTINEL rd_kafka_t *
create_txn_producer(rd_kafka_mock_cluster_t **mclusterp,
const char *transactional_id,
int broker_cnt,
...) {
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
char numstr[8];
va_list ap;
const char *key;
rd_bool_t add_interceptors = rd_false;
rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "transactional.id", transactional_id);
/* When mock brokers are set to down state they're still binding
* the port, just not listening to it, which makes connection attempts
* stall until socket.connection.setup.timeout.ms expires.
* To speed up detection of brokers being down we reduce this timeout
* to just a couple of seconds. */
test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000");
/* Speed up reconnects */
test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
test_conf_set(conf, "test.mock.num.brokers", numstr);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
test_curr->ignore_dr_err = rd_false;
va_start(ap, broker_cnt);
while ((key = va_arg(ap, const char *))) {
if (!strcmp(key, "on_response_received")) {
add_interceptors = rd_true;
(void)va_arg(ap, const char *);
} else {
test_conf_set(conf, key, va_arg(ap, const char *));
}
}
va_end(ap);
/* Add an on_.. interceptors */
if (add_interceptors)
rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
on_new_producer, NULL);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
if (mclusterp) {
*mclusterp = rd_kafka_handle_mock_cluster(rk);
TEST_ASSERT(*mclusterp, "failed to create mock cluster");
/* Create some of the common consumer "input" topics
* that we must be able to commit to with
* send_offsets_to_transaction().
* The number depicts the number of partitions in the topic. */
TEST_CALL_ERR__(
rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1));
TEST_CALL_ERR__(rd_kafka_mock_topic_create(
*mclusterp, "srctopic64", 64, 1));
}
return rk;
}
/**
* @brief Test recoverable errors using mock broker error injections
* and code coverage checks.
*/
static void do_test_txn_recoverable_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *groupid = "myGroupId";
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
/* Make sure transaction and group coordinators are different.
* This verifies that AddOffsetsToTxnRequest isn't sent to the
* transaction coordinator but the group coordinator. */
rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
/*
* Inject som InitProducerId errors that causes retries
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_InitProducerId, 3,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
(void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
(void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
rd_kafka_flush(rk, -1);
/*
* Produce a message, let it fail with a non-idempo/non-txn
* retryable error
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Make sure messages are produced */
rd_kafka_flush(rk, -1);
/*
* Send some arbitrary offsets, first with some failures, then
* succeed.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 39)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
999;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 19)->offset =
123456789;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/*
* Commit transaction, first with som failures, then succeed.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 3,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors and that the producer can recover.
*/
static void do_test_txn_fatal_idempo_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Commit the transaction, should fail */
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side bumping of the
* producer PID take longer than the remaining transaction timeout
* which should raise a retriable error from abort_transaction().
*
* @param with_sleep After the first abort sleep longer than it takes to
* re-init the pid so that the internal state automatically
* transitions.
*/
static void do_test_txn_slow_reinit(rd_bool_t with_sleep) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
SUB_TEST("%s sleep", with_sleep ? "with" : "without");
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Set transaction coordinator latency higher than
* the abort_transaction() call timeout so that the automatic
* re-initpid takes longer than abort_transaction(). */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000 /*10s*/);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Commit the transaction, should fail */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction, should fail with retriable (timeout) error */
TIMING_START(&timing, "abort_transaction(100)");
error = rd_kafka_abort_transaction(rk, 100);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY("First abort_transaction() failed: %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"Expected retriable error");
rd_kafka_error_destroy(error);
if (with_sleep)
rd_sleep(12);
/* Retry abort, should now finish. */
TEST_SAY("Retrying abort\n");
TIMING_START(&timing, "abort_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&timing);
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side bumping of the
* producer PID fail with a fencing error.
* Should raise a fatal error.
*
* @param error_code Which error code InitProducerIdRequest should fail with.
* Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
* or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
*/
static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
char errstr[512];
rd_kafka_resp_err_t fatal_err;
SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__FENCED;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Fail the PID reinit */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Abort the transaction, should fail with a fatal error */
error = rd_kafka_abort_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY("abort_transaction() failed: %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
rd_kafka_error_destroy(error);
fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief Test EndTxn errors.
*/
static void do_test_txn_endtxn_errors(void) {
rd_kafka_t *rk = NULL;
rd_kafka_mock_cluster_t *mcluster = NULL;
rd_kafka_resp_err_t err;
struct {
size_t error_cnt;
rd_kafka_resp_err_t errors[4];
rd_kafka_resp_err_t exp_err;
rd_bool_t exp_retriable;
rd_bool_t exp_abortable;
rd_bool_t exp_fatal;
rd_bool_t exp_successful_abort;
} scenario[] = {
/* This list of errors is from the EndTxnResponse handler in
* AK clients/.../TransactionManager.java */
{
/* #0 */
2,
{RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #1 */
2,
{RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #2 */
1,
{RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #3 */
3,
{RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS},
/* Should auto-recover */
RD_KAFKA_RESP_ERR_NO_ERROR,
},
{
/* #4: the abort is auto-recovering thru epoch bump */
1,
{RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID},
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
rd_true /* successful abort */
},
{
/* #5: the abort is auto-recovering thru epoch bump */
1,
{RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING},
RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
rd_true /* successful abort */
},
{
/* #6 */
1,
{RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #7 */
1,
{RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #8 */
1,
{RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED},
RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{
/* #9 */
1,
{RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED},
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */
},
{
/* #10 */
/* Any other error should raise a fatal error */
1,
{RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE},
RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
rd_false /* !retriable */,
rd_true /* abortable */,
rd_false /* !fatal */,
},
{
/* #11 */
1,
{RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{0},
};
int i;
SUB_TEST_QUICK();
for (i = 0; scenario[i].error_cnt > 0; i++) {
int j;
/* For each scenario, test:
* commit_transaction()
* flush() + commit_transaction()
* abort_transaction()
* flush() + abort_transaction()
*/
for (j = 0; j < (2 + 2); j++) {
rd_bool_t commit = j < 2;
rd_bool_t with_flush = j & 1;
rd_bool_t exp_successful_abort =
!commit && scenario[i].exp_successful_abort;
const char *commit_str =
commit ? (with_flush ? "commit&flush" : "commit")
: (with_flush ? "abort&flush" : "abort");
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
test_timing_t t_call;
TEST_SAY("Testing scenario #%d %s with %" PRIusz
" injected erorrs, expecting %s\n",
i, commit_str, scenario[i].error_cnt,
exp_successful_abort
? "successful abort"
: rd_kafka_err2name(scenario[i].exp_err));
if (!rk) {
const char *txnid = "myTxnId";
rk = create_txn_producer(&mcluster, txnid, 3,
NULL);
TEST_CALL_ERROR__(
rd_kafka_init_transactions(rk, 5000));
}
/*
* Start transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Transaction aborts will cause DR errors:
* ignore them. */
test_curr->ignore_dr_err = !commit;
/*
* Produce a message.
*/
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s",
rd_kafka_err2str(err));
if (with_flush)
test_flush(rk, -1);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4",
3)
->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64",
60)
->offset = 99999;
cgmetadata =
rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/*
* Commit transaction, first with som failures,
* then succeed.
*/
rd_kafka_mock_push_request_errors_array(
mcluster, RD_KAFKAP_EndTxn, scenario[i].error_cnt,
scenario[i].errors);
TIMING_START(&t_call, "%s", commit_str);
if (commit)
error = rd_kafka_commit_transaction(
rk, tmout_multip(5000));
else
error = rd_kafka_abort_transaction(
rk, tmout_multip(5000));
TIMING_STOP(&t_call);
if (error)
TEST_SAY(
"Scenario #%d %s failed: %s: %s "
"(retriable=%s, req_abort=%s, "
"fatal=%s)\n",
i, commit_str, rd_kafka_error_name(error),
rd_kafka_error_string(error),
RD_STR_ToF(
rd_kafka_error_is_retriable(error)),
RD_STR_ToF(
rd_kafka_error_txn_requires_abort(
error)),
RD_STR_ToF(rd_kafka_error_is_fatal(error)));
else
TEST_SAY("Scenario #%d %s succeeded\n", i,
commit_str);
if (!scenario[i].exp_err || exp_successful_abort) {
TEST_ASSERT(!error,
"Expected #%d %s to succeed, "
"got %s",
i, commit_str,
rd_kafka_error_string(error));
continue;
}
TEST_ASSERT(error != NULL, "Expected #%d %s to fail", i,
commit_str);
TEST_ASSERT(scenario[i].exp_err ==
rd_kafka_error_code(error),
"Scenario #%d: expected %s, not %s", i,
rd_kafka_err2name(scenario[i].exp_err),
rd_kafka_error_name(error));
TEST_ASSERT(
scenario[i].exp_retriable ==
(rd_bool_t)rd_kafka_error_is_retriable(error),
"Scenario #%d: retriable mismatch", i);
TEST_ASSERT(
scenario[i].exp_abortable ==
(rd_bool_t)rd_kafka_error_txn_requires_abort(
error),
"Scenario #%d: abortable mismatch", i);
TEST_ASSERT(
scenario[i].exp_fatal ==
(rd_bool_t)rd_kafka_error_is_fatal(error),
"Scenario #%d: fatal mismatch", i);
/* Handle errors according to the error flags */
if (rd_kafka_error_is_fatal(error)) {
TEST_SAY("Fatal error, destroying producer\n");
rd_kafka_error_destroy(error);
rd_kafka_destroy(rk);
rk = NULL; /* Will be re-created on the next
* loop iteration. */
} else if (rd_kafka_error_txn_requires_abort(error)) {
rd_kafka_error_destroy(error);
TEST_SAY(
"Abortable error, "
"aborting transaction\n");
TEST_CALL_ERROR__(
rd_kafka_abort_transaction(rk, -1));
} else if (rd_kafka_error_is_retriable(error)) {
rd_kafka_error_destroy(error);
TEST_SAY("Retriable error, retrying %s once\n",
commit_str);
if (commit)
TEST_CALL_ERROR__(
rd_kafka_commit_transaction(rk,
5000));
else
TEST_CALL_ERROR__(
rd_kafka_abort_transaction(rk,
5000));
} else {
TEST_FAIL(
"Scenario #%d %s: "
"Permanent error without enough "
"hints to proceed: %s\n",
i, commit_str,
rd_kafka_error_string(error));
}
}
}
/* All done */
if (rk)
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that the commit/abort works properly with infinite timeout.
*/
static void do_test_txn_endtxn_infinite(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int i;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail on as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/*
* Commit/abort transaction, first with som retriable failures,
* then success.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 10,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, -1);
else
error = rd_kafka_abort_transaction(rk, -1);
TIMING_STOP(&t_call);
TEST_SAY("%s returned %s\n", commit_str,
error ? rd_kafka_error_string(error) : "success");
TEST_ASSERT(!error, "Expected %s to succeed, got %s",
commit_str, rd_kafka_error_string(error));
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that the commit/abort user timeout is honoured.
*/
static void do_test_txn_endtxn_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int i;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/*
* Commit/abort transaction, first with some retriable failures
* whos retries exceed the user timeout.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_EndTxn, 10,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, 100);
else
error = rd_kafka_abort_transaction(rk, 100);
TIMING_STOP(&t_call);
TEST_SAY_ERROR(error, "%s returned: ", commit_str);
TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
TEST_ASSERT(
rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected %s to fail with timeout, not %s: %s", commit_str,
rd_kafka_error_name(error), rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"%s failure should raise a retriable error",
commit_str);
rd_kafka_error_destroy(error);
/* Now call it again with an infinite timeout, should work. */
TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
if (commit)
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
else
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&t_call);
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test commit/abort inflight timeout behaviour, which should result
* in a retriable error.
*/
static void do_test_txn_endtxn_timeout_inflight(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster = NULL;
const char *txnid = "myTxnId";
int32_t coord_id = 1;
int i;
SUB_TEST();
allowed_error = RD_KAFKA_RESP_ERR__TIMED_OUT;
test_curr->is_fatal_cb = error_is_fatal_cb;
rk = create_txn_producer(&mcluster, txnid, 1, "transaction.timeout.ms",
"5000", NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
for (i = 0; i < 2; i++) {
rd_bool_t commit = i == 0;
const char *commit_str = commit ? "commit" : "abort";
rd_kafka_error_t *error;
test_timing_t t_call;
/* Messages will fail as the transaction fails,
* ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/* Let EndTxn & EndTxn retry timeout */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 2,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000,
RD_KAFKA_RESP_ERR_NO_ERROR, 10000);
rd_sleep(1);
TIMING_START(&t_call, "%s_transaction()", commit_str);
if (commit)
error = rd_kafka_commit_transaction(rk, 4000);
else
error = rd_kafka_abort_transaction(rk, 4000);
TIMING_STOP(&t_call);
TEST_SAY_ERROR(error, "%s returned: ", commit_str);
TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
TEST_ASSERT(
rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected %s to fail with timeout, not %s: %s", commit_str,
rd_kafka_error_name(error), rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"%s failure should raise a retriable error",
commit_str);
rd_kafka_error_destroy(error);
/* Now call it again with an infinite timeout, should work. */
TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
if (commit)
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
else
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TIMING_STOP(&t_call);
}
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test that EndTxn is properly sent for aborted transactions
* even if AddOffsetsToTxnRequest was retried.
* This is a check for a txn_req_cnt bug.
*/
static void do_test_txn_req_cnt(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *txnid = "myTxnId";
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
/* Messages will fail on abort(), ignore the DR error */
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Send some arbitrary offsets, first with some failures, then
* succeed.
*/
offsets = rd_kafka_topic_partition_list_new(2);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 40)->offset =
999999111;
rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_AddOffsetsToTxn,
2,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test abortable errors using mock broker error injections
* and code coverage checks.
*/
static void do_test_txn_requires_abort_errors(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
int r;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* 1. Fail on produce
*/
TEST_SAY("1. Fail on produce\n");
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to fail */
test_flush(rk, 5000);
/* Any other transactional API should now raise an error */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_ASSERT(error, "expected error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"expected abortable error, not %s",
rd_kafka_error_string(error));
TEST_SAY("Error %s: %s\n", rd_kafka_error_name(error),
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/*
* 2. Restart transaction and fail on AddPartitionsToTxn
*/
TEST_SAY("2. Fail on AddPartitionsToTxn\n");
/* First refresh proper Metadata to clear the topic's auth error,
* otherwise the produce() below will fail immediately. */
r = test_get_partition_count(rk, "mytopic", 5000);
TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
error = rd_kafka_commit_transaction(rk, 5000);
TEST_ASSERT(error, "commit_transaction should have failed");
TEST_SAY("commit_transaction() error %s: %s\n",
rd_kafka_error_name(error), rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/*
* 3. Restart transaction and fail on AddOffsetsToTxn
*/
TEST_SAY("3. Fail on AddOffsetsToTxn\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn, 1,
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(error, "Expected send_offsets..() to fail");
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
"expected send_offsets_to_transaction() to fail with "
"group auth error: not %s",
rd_kafka_error_name(error));
rd_kafka_error_destroy(error);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
error = rd_kafka_commit_transaction(rk, 5000);
TEST_ASSERT(error, "commit_transaction should have failed");
rd_kafka_error_destroy(error);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test error handling and recover for when broker goes down during
* an ongoing transaction.
*/
static void do_test_txn_broker_down_in_txn(rd_bool_t down_coord) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id, leader_id, down_id;
const char *down_what;
rd_kafka_resp_err_t err;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1000;
int remains = 0;
/* Assign coordinator and leader to two different brokers */
coord_id = 1;
leader_id = 2;
if (down_coord) {
down_id = coord_id;
down_what = "coordinator";
} else {
down_id = leader_id;
down_what = "leader";
}
SUB_TEST_QUICK("Test %s down", down_what);
rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt / 2, NULL, 0, &remains);
TEST_SAY("Bringing down %s %" PRId32 "\n", down_what, down_id);
rd_kafka_mock_broker_set_down(mcluster, down_id);
rd_kafka_flush(rk, 3000);
/* Produce remaining messages */
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
rd_sleep(2);
TEST_SAY("Bringing up %s %" PRId32 "\n", down_what, down_id);
rd_kafka_mock_broker_set_up(mcluster, down_id);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
rd_kafka_destroy(rk);
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Advance the coord_id to the next broker.
*/
static void set_next_coord(rd_kafka_mock_cluster_t *mcluster,
const char *transactional_id,
int broker_cnt,
int32_t *coord_idp) {
int32_t new_coord_id;
new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
TEST_SAY("Changing transaction coordinator from %" PRId32 " to %" PRId32
"\n",
*coord_idp, new_coord_id);
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
new_coord_id);
*coord_idp = new_coord_id;
}
/**
* @brief Switch coordinator during a transaction.
*
*/
static void do_test_txn_switch_coordinator(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id;
const char *topic = "test";
const char *transactional_id = "txnid";
const int broker_cnt = 5;
const int iterations = 20;
int i;
test_timeout_set(iterations * 10);
SUB_TEST("Test switching coordinators");
rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
coord_id = 1;
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < iterations; i++) {
const int msgcnt = 100;
int remains = 0;
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt / 2, NULL, 0);
if (!(i % 3))
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
/* Produce remaining messages */
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
msgcnt / 2, msgcnt / 2, NULL, 0,
&remains);
if ((i & 1) || !(i % 8))
set_next_coord(mcluster, transactional_id, broker_cnt,
&coord_id);
if (!(i % 5)) {
test_curr->ignore_dr_err = rd_false;
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
} else {
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
}
}
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Switch coordinator during a transaction when AddOffsetsToTxn
* are sent. #3571.
*/
static void do_test_txn_switch_coordinator_refresh(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = "test";
const char *transactional_id = "txnid";
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST("Test switching coordinators (refresh)");
rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Switch the coordinator so that AddOffsetsToTxnRequest
* will respond with NOT_COORDINATOR. */
TEST_SAY("Switching to coordinator 2\n");
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 29)->offset =
99999;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, 20 * 1000));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Produce some messages */
test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);
/* And commit the transaction */
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test fatal error handling when transactions are not supported
* by the broker.
*/
static void do_test_txns_not_supported(void) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
SUB_TEST_QUICK();
test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "transactional.id", "myxnid");
test_conf_set(conf, "bootstrap.servers", ",");
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
/* Create mock cluster */
mcluster = rd_kafka_mock_cluster_new(rk, 3);
/* Disable InitProducerId */
rd_kafka_mock_set_apiversion(mcluster, 22 /*InitProducerId*/, -1, -1);
rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
error = rd_kafka_init_transactions(rk, 5 * 1000);
TEST_SAY("init_transactions() returned %s: %s\n",
error ? rd_kafka_error_name(error) : "success",
error ? rd_kafka_error_string(error) : "success");
TEST_ASSERT(error, "Expected init_transactions() to fail");
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
"Expected init_transactions() to fail with %s, not %s: %s",
rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
rd_kafka_error_name(error), rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test"),
RD_KAFKA_V_KEY("test", 4), RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
"Expected producev() to fail with %s, not %s",
rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
rd_kafka_err2name(err));
rd_kafka_mock_cluster_destroy(mcluster);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
*/
static void do_test_txns_send_offsets_concurrent_is_retried(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/*
* Have AddOffsetsToTxn fail but eventually succeed due to
* infinite retries.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that send_offsets_to_transaction() with no eligible offsets
* is handled properly - the call should succeed immediately and be
* repeatable.
*/
static void do_test_txns_send_offsets_non_eligible(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/* Empty offsets list */
offsets = rd_kafka_topic_partition_list_new(0);
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
/* Now call it again, should also succeed. */
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that request timeouts don't cause crash (#2913).
*/
static void do_test_txns_no_timeout_crash(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST_QUICK();
rk =
create_txn_producer(&mcluster, "txnid", 3, "socket.timeout.ms",
"1000", "transaction.timeout.ms", "5000", NULL);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
test_flush(rk, -1);
/* Delay all broker connections */
if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
TEST_FAIL("Failed to set broker RTT: %s",
rd_kafka_err2str(err));
/* send_offsets..() should now time out */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(error, "Expected send_offsets..() to fail");
TEST_SAY("send_offsets..() failed with %serror: %s\n",
rd_kafka_error_is_retriable(error) ? "retriable " : "",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"expected send_offsets_to_transaction() to fail with "
"timeout, not %s",
rd_kafka_error_name(error));
TEST_ASSERT(rd_kafka_error_is_retriable(error),
"expected send_offsets_to_transaction() to fail with "
"a retriable error");
rd_kafka_error_destroy(error);
/* Reset delay and try again */
if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
(err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
TEST_FAIL("Failed to reset broker RTT: %s",
rd_kafka_err2str(err));
TEST_SAY("Retrying send_offsets..()\n");
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
rd_kafka_error_string(error));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test auth failure handling.
*/
static void do_test_txn_auth_failure(int16_t ApiKey,
rd_kafka_resp_err_t ErrorCode) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s", rd_kafka_ApiKey2str(ApiKey),
rd_kafka_err2name(ErrorCode));
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
rd_kafka_mock_push_request_errors(mcluster, ApiKey, 1, ErrorCode);
error = rd_kafka_init_transactions(rk, 5000);
TEST_ASSERT(error, "Expected init_transactions() to fail");
TEST_SAY("init_transactions() failed: %s: %s\n",
rd_kafka_err2name(rd_kafka_error_code(error)),
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
"Expected error %s, not %s", rd_kafka_err2name(ErrorCode),
rd_kafka_err2name(rd_kafka_error_code(error)));
TEST_ASSERT(rd_kafka_error_is_fatal(error),
"Expected error to be fatal");
TEST_ASSERT(!rd_kafka_error_is_retriable(error),
"Expected error to not be retriable");
rd_kafka_error_destroy(error);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Issue #3041: Commit fails due to message flush() taking too long,
* eventually resulting in an unabortable error and failure to
* re-init the transactional producer.
*/
static void do_test_txn_flush_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
const char *topic = "myTopic";
const int32_t coord_id = 2;
int msgcounter = 0;
rd_bool_t is_retry = rd_false;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "message.timeout.ms",
"10000", "transaction.timeout.ms", "10000",
/* Speed up coordinator reconnect */
"reconnect.backoff.max.ms", "1000", NULL);
/* Broker down is not a test-failing error */
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
/* Set coordinator so we can disconnect it later */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
/*
* Init transactions
*/
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
retry:
if (!is_retry) {
/* First attempt should fail. */
test_curr->ignore_dr_err = rd_true;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
/* Assign invalid partition leaders for some partitions so
* that messages will not be delivered. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
} else {
/* The retry should succeed */
test_curr->ignore_dr_err = rd_false;
test_curr->exp_dr_err = is_retry
? RD_KAFKA_RESP_ERR_NO_ERROR
: RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
}
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Produce some messages to specific partitions and random.
*/
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA, 0, 0, 100,
NULL, 10, &msgcounter);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 49)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
999;
rd_kafka_topic_partition_list_add(offsets, "srctopic64", 34)->offset =
123456789;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
rd_sleep(2);
if (!is_retry) {
/* Now disconnect the coordinator. */
TEST_SAY("Disconnecting transaction coordinator %" PRId32 "\n",
coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
}
/*
* Start committing.
*/
error = rd_kafka_commit_transaction(rk, -1);
if (!is_retry) {
TEST_ASSERT(error != NULL, "Expected commit to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else {
TEST_ASSERT(!error, "Expected commit to succeed, not: %s",
rd_kafka_error_string(error));
}
if (!is_retry) {
/*
* Bring the coordinator back up.
*/
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_sleep(2);
/*
* Abort, and try again, this time without error.
*/
TEST_SAY("Aborting and retrying\n");
is_retry = rd_true;
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
goto retry;
}
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief ESC-4424: rko is reused in response handler after destroy in coord_req
* sender due to bad state.
*
* This is somewhat of a race condition so we need to perform a couple of
* iterations before it hits, usually 2 or 3, so we try at least 15 times.
*/
static void do_test_txn_coord_req_destroy(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int i;
int errcnt = 0;
SUB_TEST();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
for (i = 0; i < 15; i++) {
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
test_timeout_set(10);
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Inject errors to trigger retries
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddPartitionsToTxn,
2, /* first request + number of internal retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_AddOffsetsToTxn,
1, /* first request + number of internal retries */
RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 4,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
/* FIXME: When KIP-360 is supported, add this error:
* RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/*
* Send offsets to transaction
*/
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)
->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
error = rd_kafka_send_offsets_to_transaction(rk, offsets,
cgmetadata, -1);
TEST_SAY("send_offsets_to_transaction() #%d: %s\n", i,
rd_kafka_error_string(error));
/* As we can't control the exact timing and sequence
* of requests this sometimes fails and sometimes succeeds,
* but we run the test enough times to trigger at least
* one failure. */
if (error) {
TEST_SAY(
"send_offsets_to_transaction() #%d "
"failed (expectedly): %s\n",
i, rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error for #%d", i);
rd_kafka_error_destroy(error);
errcnt++;
}
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Allow time for internal retries */
rd_sleep(2);
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
}
TEST_ASSERT(errcnt > 0,
"Expected at least one send_offets_to_transaction() "
"failure");
/* All done */
rd_kafka_destroy(rk);
}
static rd_atomic32_t multi_find_req_cnt;
static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
return RD_KAFKA_RESP_ERR_NO_ERROR;
TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
rtt != -1 ? (float)rtt / 1000.0 : 0.0,
done ? "already done" : "not done yet",
rd_kafka_err2name(err));
if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
/* Trigger a broker down/up event, which in turns
* triggers the coord_req_fsm(). */
rd_kafka_mock_broker_set_down(mcluster, 2);
rd_kafka_mock_broker_set_up(mcluster, 2);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/* Trigger a broker down/up event, which in turns
* triggers the coord_req_fsm(). */
rd_kafka_mock_broker_set_down(mcluster, 3);
rd_kafka_mock_broker_set_up(mcluster, 3);
/* Clear the downed broker's latency so that it reconnects
* quickly, otherwise the ApiVersionRequest will be delayed and
* this will in turn delay the -> UP transition that we need to
* trigger the coord_reqs. */
rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
/* Only do this down/up once */
rd_atomic32_add(&multi_find_req_cnt, 10000);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
* the same coord_req_t, but the first one received will destroy
* the coord_req_t object and make the subsequent FindCoordingResponses
* reference a freed object.
*
* What we want to achieve is this sequence:
* 1. AddOffsetsToTxnRequest + Response which..
* 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
* 3. Triggers a FindCoordinatorRequest
* 4. FindCoordinatorResponse from 3 is received ..
* 5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
* 6. Another broker changing state to Up triggers coord reqs again, which..
* 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
* 7. FindCoordinatorResponse from 5 is received, references the destroyed rko
* and crashes.
*/
static void do_test_txn_coord_req_multi_find(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
int i;
SUB_TEST();
rd_atomic32_init(&multi_find_req_cnt, 0);
on_response_received_cb = multi_find_on_response_received_cb;
rk = create_txn_producer(&mcluster, txnid, 3,
/* Need connections to all brokers so we
* can trigger coord_req_fsm events
* by toggling connections. */
"enable.sparse.connections", "false",
/* Set up on_response_received interceptor */
"on_response_received", "", NULL);
/* Let broker 1 be both txn and group coordinator
* so that the group coordinator connection is up when it is time
* send the TxnOffsetCommitRequest. */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
/* Set broker 1, 2, and 3 as leaders for a partition each and
* later produce to both partitions so we know there's a connection
* to all brokers. */
rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
for (i = 0; i < 3; i++) {
err = rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(i),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
}
test_flush(rk, 5000);
/*
* send_offsets_to_transaction() will query for the group coordinator,
* we need to make those requests slow so that multiple requests are
* sent.
*/
for (i = 1; i <= 3; i++)
rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
/*
* Send offsets to transaction
*/
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
error =
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
TEST_SAY("send_offsets_to_transaction() %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
rd_kafka_error_string(error));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
/* Clear delay */
for (i = 1; i <= 3; i++)
rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
rd_sleep(5);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/* All done */
TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
"on_request_sent interceptor did not trigger properly");
rd_kafka_destroy(rk);
on_response_received_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief ESC-4410: adding producer partitions gradually will trigger multiple
* AddPartitionsToTxn requests. Due to a bug the third partition to be
* registered would hang in PEND_TXN state.
*
* Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
* at the same time, followed by a need for a third:
*
* 1. Set coordinator broker rtt high (to give us time to produce).
* 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
* 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
* 4. Wait for second AddPartitionsToTxn response.
* 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
* causes it to be stale in pending state.
*/
static rd_atomic32_t multi_addparts_resp_cnt;
static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t *rk,
int sockfd,
const char *brokername,
int32_t brokerid,
int16_t ApiKey,
int16_t ApiVersion,
int32_t CorrId,
size_t size,
int64_t rtt,
rd_kafka_resp_err_t err,
void *ic_opaque) {
if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
", ApiKey %hd, CorrId %d, rtt %.2fms, count %" PRId32
": %s\n",
rd_kafka_name(rk), brokername, brokerid, ApiKey,
CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0,
rd_atomic32_get(&multi_addparts_resp_cnt),
rd_kafka_err2name(err));
rd_atomic32_add(&multi_addparts_resp_cnt, 1);
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static void do_test_txn_addparts_req_multi(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
const char *txnid = "txnid", *topic = "mytopic";
int32_t txn_coord = 2;
SUB_TEST();
rd_atomic32_init(&multi_addparts_resp_cnt, 0);
on_response_received_cb = multi_addparts_response_received_cb;
rk = create_txn_producer(&mcluster, txnid, 3, "linger.ms", "0",
"message.timeout.ms", "9000",
/* Set up on_response_received interceptor */
"on_response_received", "", NULL);
/* Let broker 1 be txn coordinator. */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
/* Set partition leaders to non-txn-coord broker so they wont
* be affected by rtt delay */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
/*
* Run one transaction first to let the client familiarize with
* the topic, this avoids metadata lookups, etc, when the real
* test is run.
*/
TEST_SAY("Running seed transaction\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE("seed", 4),
RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
/*
* Now perform test transaction with rtt delays
*/
TEST_SAY("Running test transaction\n");
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Reset counter */
rd_atomic32_set(&multi_addparts_resp_cnt, 0);
/* Add latency to txn coordinator so we can pace our produce() calls */
rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
/* Produce to partition 0 */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
rd_usleep(500 * 1000, NULL);
/* Produce to partition 1 */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(1),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
rd_usleep(10 * 1000, NULL);
TEST_SAY("%" PRId32 " AddPartitionsToTxnResponses seen\n",
rd_atomic32_get(&multi_addparts_resp_cnt));
/* Produce to partition 2, this message will hang in
* queue if the bug is not fixed. */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(2),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Allow some extra time for things to settle before committing
* transaction. */
rd_usleep(1000 * 1000, NULL);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10 * 1000));
/* All done */
rd_kafka_destroy(rk);
on_response_received_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
*
* There are two things to test;
* - OffsetFetch triggered by committed() (and similar code paths)
* - OffsetFetch triggered by assign()
*/
static void do_test_unstable_offset_commit(void) {
rd_kafka_t *rk, *c;
rd_kafka_conf_t *c_conf;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
const char *topic = "srctopic4";
const int msgcnt = 100;
const int64_t offset_to_commit = msgcnt / 2;
int i;
int remains = 0;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_conf_init(&c_conf, NULL, 0);
test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
test_conf_set(c_conf, "bootstrap.servers",
rd_kafka_mock_cluster_bootstraps(mcluster));
test_conf_set(c_conf, "enable.partition.eof", "true");
test_conf_set(c_conf, "auto.offset.reset", "error");
c = test_create_consumer("mygroup", NULL, c_conf, NULL);
rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
/* Produce some messages to the topic so that the consumer has
* something to read. */
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt, NULL, 0,
&remains);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
/* Commit offset */
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
offset_to_commit;
TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0 /*sync*/));
rd_kafka_topic_partition_list_destroy(offsets);
/* Retrieve offsets by calling committed().
*
* Have OffsetFetch fail and retry, on the first iteration
* the API timeout is higher than the amount of time the retries will
* take and thus succeed, and on the second iteration the timeout
* will be lower and thus fail. */
for (i = 0; i < 2; i++) {
rd_kafka_resp_err_t err;
rd_kafka_resp_err_t exp_err =
i == 0 ? RD_KAFKA_RESP_ERR_NO_ERROR
: RD_KAFKA_RESP_ERR__TIMED_OUT;
int timeout_ms = exp_err ? 200 : 5 * 1000;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_OffsetFetch,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0);
err = rd_kafka_committed(c, offsets, timeout_ms);
TEST_SAY("#%d: committed() returned %s (expected %s)\n", i,
rd_kafka_err2name(err), rd_kafka_err2name(exp_err));
TEST_ASSERT(err == exp_err,
"#%d: Expected committed() to return %s, not %s", i,
rd_kafka_err2name(exp_err), rd_kafka_err2name(err));
TEST_ASSERT(offsets->cnt == 1,
"Expected 1 committed offset, not %d",
offsets->cnt);
if (!exp_err)
TEST_ASSERT(offsets->elems[0].offset ==
offset_to_commit,
"Expected committed offset %" PRId64
", "
"not %" PRId64,
offset_to_commit, offsets->elems[0].offset);
else
TEST_ASSERT(offsets->elems[0].offset < 0,
"Expected no committed offset, "
"not %" PRId64,
offsets->elems[0].offset);
rd_kafka_topic_partition_list_destroy(offsets);
}
TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
RD_KAFKA_OFFSET_STORED;
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_OffsetFetch,
1 + 5, /* first request + some retries */
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
test_consumer_incremental_assign("assign", c, offsets);
rd_kafka_topic_partition_list_destroy(offsets);
test_consumer_poll_exact("consume", c, 0, 1 /*eof*/, 0, msgcnt / 2,
rd_true /*exact counts*/, NULL);
/* All done */
rd_kafka_destroy(c);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief If a message times out locally before being attempted to send
* and commit_transaction() is called, the transaction must not succeed.
* https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
*/
static void do_test_commit_after_msg_timeout(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id, leader_id;
rd_kafka_resp_err_t err;
rd_kafka_error_t *error;
const char *topic = "test";
const char *transactional_id = "txnid";
int remains = 0;
SUB_TEST_QUICK();
/* Assign coordinator and leader to two different brokers */
coord_id = 1;
leader_id = 2;
rk = create_txn_producer(&mcluster, transactional_id, 3,
"message.timeout.ms", "5000",
"transaction.timeout.ms", "10000", NULL);
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_SAY("Bringing down %" PRId32 "\n", leader_id);
rd_kafka_mock_broker_set_down(mcluster, leader_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
TEST_SAY_ERROR(error, "commit_transaction() failed (as expected): ");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected txn_requires_abort error");
rd_kafka_error_destroy(error);
/* Bring the brokers up so the abort can complete */
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_kafka_mock_broker_set_up(mcluster, leader_id);
TEST_SAY("Aborting transaction\n");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not flushed\n", remains);
TEST_SAY("Attempting second transaction, which should succeed\n");
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
* during an ongoing transaction.
* The transaction should instead enter the abortable state.
*/
static void do_test_out_of_order_seq(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 1, leader = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
rd_kafka_resp_err_t err;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce one seeding message first to get the leader up and running */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Let partition leader have a latency of 2 seconds
* so that we can have multiple messages in-flight. */
rd_kafka_mock_broker_set_rtt(mcluster, leader, 2 * 1000);
/* Produce a message, let it fail with with different errors,
* ending with OUT_OF_ORDER which previously triggered an
* Epoch bump. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 3,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
/* Produce three messages that will be delayed
* and have errors injected.*/
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
/* Now sleep a short while so that the messages are processed
* by the broker and errors are returned. */
TEST_SAY("Sleeping..\n");
rd_sleep(5);
rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
/* Produce a fifth message, should fail with ERR__STATE since
* the transaction should have entered the abortable state. */
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
"Expected produce() to fail with ERR__STATE, not %s",
rd_kafka_err2name(err));
TEST_SAY("produce() failed as expected: %s\n", rd_kafka_err2str(err));
/* Commit the transaction, should fail with abortable error. */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify lossless delivery if topic disappears from Metadata for awhile.
*
* If a topic is removed from metadata inbetween transactions, the producer
* will remove its partition state for the topic's partitions.
* If later the same topic comes back (same topic instance, not a new creation)
* then the producer must restore the previously used msgid/BaseSequence
* in case the same Epoch is still used, or messages will be silently lost
* as they would seem like legit duplicates to the broker.
*
* Reproduction:
* 1. produce msgs to topic, commit transaction.
* 2. remove topic from metadata
* 3. make sure client updates its metadata, which removes the partition
* objects.
* 4. restore the topic in metadata
* 5. produce new msgs to topic, commit transaction.
* 6. consume topic. All messages should be accounted for.
*/
static void do_test_topic_disappears_for_awhile(void) {
rd_kafka_t *rk, *c;
rd_kafka_conf_t *c_conf;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = "mytopic";
const char *txnid = "myTxnId";
test_timing_t timing;
int i;
int msgcnt = 0;
const int partition_cnt = 10;
SUB_TEST_QUICK();
rk = create_txn_producer(
&mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100",
"topic.metadata.refresh.interval.ms", "2000", NULL);
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
for (i = 0; i < 2; i++) {
int cnt = 3 * 2 * partition_cnt;
rd_bool_t remove_topic = (i % 2) == 0;
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
while (cnt-- >= 0) {
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_PARTITION(cnt % partition_cnt),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
msgcnt++;
}
/* Commit the transaction */
TIMING_START(&timing, "commit_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TIMING_STOP(&timing);
if (remove_topic) {
/* Make it seem the topic is removed, refresh metadata,
* and then make the topic available again. */
const rd_kafka_metadata_t *md;
TEST_SAY("Marking topic as non-existent\n");
rd_kafka_mock_topic_set_error(
mcluster, topic,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, NULL, &md,
tmout_multip(5000)));
rd_kafka_metadata_destroy(md);
rd_sleep(2);
TEST_SAY("Bringing topic back to life\n");
rd_kafka_mock_topic_set_error(
mcluster, topic, RD_KAFKA_RESP_ERR_NO_ERROR);
}
}
TEST_SAY("Verifying messages by consumtion\n");
test_conf_init(&c_conf, NULL, 0);
test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
test_conf_set(c_conf, "bootstrap.servers",
rd_kafka_mock_cluster_bootstraps(mcluster));
test_conf_set(c_conf, "enable.partition.eof", "true");
test_conf_set(c_conf, "auto.offset.reset", "earliest");
c = test_create_consumer("mygroup", NULL, c_conf, NULL);
test_consumer_subscribe(c, topic);
test_consumer_poll_exact("consume", c, 0, partition_cnt, 0, msgcnt,
rd_true /*exact*/, NULL);
rd_kafka_destroy(c);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Test that group coordinator requests can handle an
* untimely disconnect.
*
* The transaction manager makes use of librdkafka coord_req to commit
* transaction offsets to the group coordinator.
* If the connection to the given group coordinator is not up the
* coord_req code will request a connection once, but if this connection fails
* there will be no new attempts and the coord_req will idle until either
* destroyed or the connection is retried for other reasons.
* This in turn stalls the send_offsets_to_transaction() call until the
* transaction times out.
*
* There are two variants to this test based on switch_coord:
* - True - Switches the coordinator during the downtime.
* The client should detect this and send the request to the
* new coordinator.
* - False - The coordinator remains on the down broker. Client will reconnect
* when down broker comes up again.
*/
struct some_state {
rd_kafka_mock_cluster_t *mcluster;
rd_bool_t switch_coord;
int32_t broker_id;
const char *grpid;
};
static int delayed_up_cb(void *arg) {
struct some_state *state = arg;
rd_sleep(3);
if (state->switch_coord) {
TEST_SAY("Switching group coordinator to %" PRId32 "\n",
state->broker_id);
rd_kafka_mock_coordinator_set(state->mcluster, "group",
state->grpid, state->broker_id);
} else {
TEST_SAY("Bringing up group coordinator %" PRId32 "..\n",
state->broker_id);
rd_kafka_mock_broker_set_up(state->mcluster, state->broker_id);
}
return 0;
}
static void do_test_disconnected_group_coord(rd_bool_t switch_coord) {
const char *topic = "mytopic";
const char *txnid = "myTxnId";
const char *grpid = "myGrpId";
const int partition_cnt = 1;
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
struct some_state state = RD_ZERO_INIT;
test_timing_t timing;
thrd_t thrd;
int ret;
SUB_TEST_QUICK("switch_coord=%s", RD_STR_ToF(switch_coord));
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
rk = create_txn_producer(&mcluster, txnid, 3, NULL);
rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
/* Broker 1: txn coordinator
* Broker 2: group coordinator
* Broker 3: partition leader & backup coord if switch_coord=true */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3);
/* Bring down group coordinator so there are no undesired
* connections to it. */
rd_kafka_mock_broker_set_down(mcluster, 2);
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
rd_sleep(1);
/* Run a background thread that after 3s, which should be enough
* to perform the first failed connection attempt, makes the
* group coordinator available again. */
state.switch_coord = switch_coord;
state.mcluster = mcluster;
state.grpid = grpid;
state.broker_id = switch_coord ? 3 : 2;
if (thrd_create(&thrd, delayed_up_cb, &state) != thrd_success)
TEST_FAIL("Failed to create thread");
TEST_SAY("Calling send_offsets_to_transaction()\n");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 1;
cgmetadata = rd_kafka_consumer_group_metadata_new(grpid);
TIMING_START(&timing, "send_offsets_to_transaction(-1)");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
TIMING_STOP(&timing);
TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
thrd_join(thrd, &ret);
/* Commit the transaction */
TIMING_START(&timing, "commit_transaction(-1)");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TIMING_STOP(&timing);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Test that a NULL coordinator is not fatal when
* the transactional producer reconnects to the txn coordinator
* and the first thing it does is a FindCoordinatorRequest that
* fails with COORDINATOR_NOT_AVAILABLE, setting coordinator to NULL.
*/
static void do_test_txn_coordinator_null_not_fatal(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
SUB_TEST_QUICK();
/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
/* One second is the minimum transaction timeout */
rk = create_txn_producer(&mcluster, transactional_id, 1,
"transaction.timeout.ms", "1000", NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
/* Start transactioning */
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Makes the produce request timeout. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
/* This value is linked to transaction.timeout.ms, needs enough time
* so the message times out and a DrainBump sequence is started. */
rd_kafka_flush(rk, 1000);
/* To trigger the error the COORDINATOR_NOT_AVAILABLE response
* must come AFTER idempotent state has changed to WaitTransport
* but BEFORE it changes to WaitPID. To make it more likely
* rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms
* in rd_kafka_txn_coord_query, when unable to query for
* transaction coordinator.
*/
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);
/* Coordinator down starts the FindCoordinatorRequest loop. */
TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
/* Coordinator down for some time. */
rd_usleep(100 * 1000, NULL);
/* When it comes up, the error is triggered, if the preconditions
* happen. */
TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_up(mcluster, coord_id);
/* Make sure DRs are received */
rd_kafka_flush(rk, 1000);
error = rd_kafka_commit_transaction(rk, -1);
TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
/* Needs to wait some time before closing to make sure it doesn't go
* into TERMINATING state before error is triggered. */
rd_usleep(1000 * 1000, NULL);
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
test_curr->is_fatal_cb = NULL;
SUB_TEST_PASS();
}
/**
* @brief Simple test to make sure the init_transactions() timeout is honoured
* and also not infinite.
*/
static void do_test_txn_resumable_init(void) {
rd_kafka_t *rk;
const char *transactional_id = "txnid";
rd_kafka_error_t *error;
test_timing_t duration;
SUB_TEST();
rd_kafka_conf_t *conf;
test_conf_init(&conf, NULL, 20);
test_conf_set(conf, "bootstrap.servers", "");
test_conf_set(conf, "transactional.id", transactional_id);
test_conf_set(conf, "transaction.timeout.ms", "4000");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
/* First make sure a lower timeout is honoured. */
TIMING_START(&duration, "init_transactions(1000)");
error = rd_kafka_init_transactions(rk, 1000);
TIMING_STOP(&duration);
if (error)
TEST_SAY("First init_transactions failed (as expected): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected _TIMED_OUT, not %s",
error ? rd_kafka_error_string(error) : "success");
rd_kafka_error_destroy(error);
TIMING_ASSERT(&duration, 900, 1500);
TEST_SAY(
"Performing second init_transactions() call now with an "
"infinite timeout: "
"should time out in 2 x transaction.timeout.ms\n");
TIMING_START(&duration, "init_transactions(infinite)");
error = rd_kafka_init_transactions(rk, -1);
TIMING_STOP(&duration);
if (error)
TEST_SAY("Second init_transactions failed (as expected): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
"Expected _TIMED_OUT, not %s",
error ? rd_kafka_error_string(error) : "success");
rd_kafka_error_destroy(error);
TIMING_ASSERT(&duration, 2 * 4000 - 500, 2 * 4000 + 500);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Retries a transaction call until it succeeds or returns a
* non-retriable error - which will cause the test to fail.
*
* @param intermed_calls Is a block of code that will be called after each
* retriable failure of \p call.
*/
#define RETRY_TXN_CALL__(call, intermed_calls) \
do { \
rd_kafka_error_t *_error = call; \
if (!_error) \
break; \
TEST_SAY_ERROR(_error, "%s: ", "" #call); \
TEST_ASSERT(rd_kafka_error_is_retriable(_error), \
"Expected retriable error"); \
TEST_SAY("%s failed, retrying in 1 second\n", "" #call); \
rd_kafka_error_destroy(_error); \
intermed_calls; \
rd_sleep(1); \
} while (1)
/**
* @brief Call \p call and expect it to fail with \p exp_err_code.
*/
#define TXN_CALL_EXPECT_ERROR__(call, exp_err_code) \
do { \
rd_kafka_error_t *_error = call; \
TEST_ASSERT(_error != NULL, \
"%s: Expected %s error, got success", "" #call, \
rd_kafka_err2name(exp_err_code)); \
TEST_SAY_ERROR(_error, "%s: ", "" #call); \
TEST_ASSERT(rd_kafka_error_code(_error) == exp_err_code, \
"%s: Expected %s error, got %s", "" #call, \
rd_kafka_err2name(exp_err_code), \
rd_kafka_error_name(_error)); \
rd_kafka_error_destroy(_error); \
} while (0)
/**
* @brief Simple test to make sure short API timeouts can be safely resumed
* by calling the same API again.
*
* @param do_commit Commit transaction if true, else abort transaction.
*/
static void do_test_txn_resumable_calls_timeout(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
SUB_TEST("%s_transaction", do_commit ? "commit" : "abort");
rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
TEST_SAY("Starting transaction\n");
TEST_SAY("Delaying first two InitProducerIdRequests by 500ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_InitProducerId, 2,
RD_KAFKA_RESP_ERR_NO_ERROR, 500, RD_KAFKA_RESP_ERR_NO_ERROR, 500);
RETRY_TXN_CALL__(
rd_kafka_init_transactions(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
RETRY_TXN_CALL__(rd_kafka_begin_transaction(rk), /*none*/);
TEST_SAY("Delaying ProduceRequests by 3000ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
TEST_SAY("Delaying SendOffsetsToTransaction by 400ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_AddOffsetsToTxn, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 400);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
/* This is not a resumable call on timeout */
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
TEST_SAY("Delaying EndTxnRequests by 1200ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR,
1200);
/* Committing/aborting the transaction will also be delayed by the
* previous accumulated remaining delays. */
if (do_commit) {
TEST_SAY("Committing transaction\n");
RETRY_TXN_CALL__(
rd_kafka_commit_transaction(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
} else {
TEST_SAY("Aborting transaction\n");
RETRY_TXN_CALL__(
rd_kafka_abort_transaction(rk, 100),
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, -1),
RD_KAFKA_RESP_ERR__CONFLICT));
}
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Verify that resuming timed out calls that after the timeout, but
* before the resuming call, would error out.
*/
static void do_test_txn_resumable_calls_timeout_error(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;
rd_kafka_error_t *error;
SUB_TEST_QUICK("%s_transaction", do_commit ? "commit" : "abort");
rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
coord_id);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
TEST_SAY("Starting transaction\n");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);
TEST_SAY("Fail EndTxn fatally after 2000ms\n");
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_EndTxn, 1,
RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, 2000);
if (do_commit) {
TEST_SAY("Committing transaction\n");
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
RD_KAFKA_RESP_ERR__TIMED_OUT);
/* Sleep so that the background EndTxn fails locally and sets
* an error result. */
rd_sleep(3);
error = rd_kafka_commit_transaction(rk, -1);
} else {
TEST_SAY("Aborting transaction\n");
TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
RD_KAFKA_RESP_ERR__TIMED_OUT);
/* Sleep so that the background EndTxn fails locally and sets
* an error result. */
rd_sleep(3);
error = rd_kafka_commit_transaction(rk, -1);
}
TEST_ASSERT(error != NULL && rd_kafka_error_is_fatal(error),
"Expected fatal error, not %s",
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
"Expected error INVALID_TXN_STATE, got %s",
rd_kafka_error_name(error));
rd_kafka_error_destroy(error);
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
/**
* @brief Concurrent transaction API calls are not permitted.
* This test makes sure they're properly enforced.
*
* For each transactional API, call it with a 5s timeout, and during that time
* from another thread call transactional APIs, one by one, and verify that
* we get an ERR__CONFLICT error back in the second thread.
*
* We use a mutex for synchronization, the main thread will hold the lock
* when not calling an API but release it just prior to calling.
* The other thread will acquire the lock, sleep, and hold the lock while
* calling the concurrent API that should fail immediately, releasing the lock
* when done.
*
*/
struct _txn_concurrent_state {
const char *api;
mtx_t lock;
rd_kafka_t *rk;
struct test *test;
};
static int txn_concurrent_thread_main(void *arg) {
struct _txn_concurrent_state *state = arg;
static const char *apis[] = {
"init_transactions", "begin_transaction",
"send_offsets_to_transaction", "commit_transaction",
"abort_transaction", NULL};
rd_kafka_t *rk = state->rk;
const char *main_api = NULL;
int i;
/* Update TLS variable so TEST_..() macros work */
test_curr = state->test;
while (1) {
const char *api = NULL;
const int timeout_ms = 10000;
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t exp_err;
test_timing_t duration;
/* Wait for other thread's txn call to start, then sleep a bit
* to increase the chance of that call has really begun. */
mtx_lock(&state->lock);
if (state->api && state->api == main_api) {
/* Main thread is still blocking on the last API call */
TEST_SAY("Waiting for main thread to finish %s()\n",
main_api);
mtx_unlock(&state->lock);
rd_sleep(1);
continue;
} else if (!(main_api = state->api)) {
mtx_unlock(&state->lock);
break;
}
rd_sleep(1);
for (i = 0; (api = apis[i]) != NULL; i++) {
TEST_SAY(
"Triggering concurrent %s() call while "
"main is in %s() call\n",
api, main_api);
TIMING_START(&duration, "%s", api);
if (!strcmp(api, "init_transactions"))
error =
rd_kafka_init_transactions(rk, timeout_ms);
else if (!strcmp(api, "begin_transaction"))
error = rd_kafka_begin_transaction(rk);
else if (!strcmp(api, "send_offsets_to_transaction")) {
rd_kafka_topic_partition_list_t *offsets =
rd_kafka_topic_partition_list_new(1);
rd_kafka_consumer_group_metadata_t *cgmetadata =
rd_kafka_consumer_group_metadata_new(
"mygroupid");
rd_kafka_topic_partition_list_add(
offsets, "srctopic4", 0)
->offset = 12;
error = rd_kafka_send_offsets_to_transaction(
rk, offsets, cgmetadata, -1);
rd_kafka_consumer_group_metadata_destroy(
cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
} else if (!strcmp(api, "commit_transaction"))
error =
rd_kafka_commit_transaction(rk, timeout_ms);
else if (!strcmp(api, "abort_transaction"))
error =
rd_kafka_abort_transaction(rk, timeout_ms);
else
TEST_FAIL("Unknown API: %s", api);
TIMING_STOP(&duration);
TEST_SAY_ERROR(error, "Conflicting %s() call: ", api);
TEST_ASSERT(error,
"Expected conflicting %s() call to fail",
api);
exp_err = !strcmp(api, main_api)
? RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
: RD_KAFKA_RESP_ERR__CONFLICT;
TEST_ASSERT(rd_kafka_error_code(error) == exp_err,
"Conflicting %s(): Expected %s, not %s",
api, rd_kafka_err2str(exp_err),
rd_kafka_error_name(error));
TEST_ASSERT(
rd_kafka_error_is_retriable(error),
"Conflicting %s(): Expected retriable error", api);
rd_kafka_error_destroy(error);
/* These calls should fail immediately */
TIMING_ASSERT(&duration, 0, 100);
}
mtx_unlock(&state->lock);
}
return 0;
}
static void do_test_txn_concurrent_operations(rd_bool_t do_commit) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
int32_t coord_id = 1;
rd_kafka_resp_err_t err;
const char *topic = "test";
const char *transactional_id = "txnid";
int remains = 0;
thrd_t thrd;
struct _txn_concurrent_state state = RD_ZERO_INIT;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
SUB_TEST("%s", do_commit ? "commit" : "abort");
test_timeout_set(90);
/* We need to override the value of socket.connection.setup.timeout.ms
* to be at least 2*RTT of the mock broker. This is because the first
* ApiVersion request will fail, since we make the request with v3, and
* the mock broker's MaxVersion is 2, so the request is retried with v0.
* We use the value 3*RTT to add some buffer.
*/
rk = create_txn_producer(&mcluster, transactional_id, 1,
"socket.connection.setup.timeout.ms", "15000",
NULL);
/* Set broker RTT to 3.5s so that the background thread has ample
* time to call its conflicting APIs.
* This value must be less than socket.connection.setup.timeout.ms/2. */
rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 3500);
err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
/* Set up shared state between us and the concurrent thread */
mtx_init(&state.lock, mtx_plain);
state.test = test_curr;
state.rk = rk;
/* We release the lock only while calling the TXN API */
mtx_lock(&state.lock);
/* Spin up concurrent thread */
if (thrd_create(&thrd, txn_concurrent_thread_main, (void *)&state) !=
thrd_success)
TEST_FAIL("Failed to create thread");
#define _start_call(callname) \
do { \
state.api = callname; \
mtx_unlock(&state.lock); \
} while (0)
#define _end_call() mtx_lock(&state.lock)
_start_call("init_transactions");
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
_end_call();
/* This call doesn't block, so can't really be tested concurrently. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10,
NULL, 0, &remains);
_start_call("send_offsets_to_transaction");
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(
rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
_end_call();
if (do_commit) {
_start_call("commit_transaction");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
_end_call();
} else {
_start_call("abort_transaction");
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
_end_call();
}
/* Signal completion to background thread */
state.api = NULL;
mtx_unlock(&state.lock);
thrd_join(thrd, NULL);
rd_kafka_destroy(rk);
mtx_destroy(&state.lock);
SUB_TEST_PASS();
}
/**
* @brief KIP-360: Test that fatal idempotence errors triggers abortable
* transaction errors, but let the broker-side abort of the
* transaction fail with a fencing error.
* Should raise a fatal error.
*
* @param error_code Which error code EndTxn should fail with.
* Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
* or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
*/
static void do_test_txn_fenced_abort(rd_kafka_resp_err_t error_code) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 2;
const char *txnid = "myTxnId";
char errstr[512];
rd_kafka_resp_err_t fatal_err;
size_t errors_cnt;
SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__FENCED;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce a message without error first */
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Fail abort transaction */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_EndTxn, 1, error_code, 0);
/* Fail the PID reinit */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
TEST_CALL_ERR__(rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
test_flush(rk, -1);
/* Abort the transaction, should fail with a fatal error */
error = rd_kafka_abort_transaction(rk, -1);
TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
TEST_SAY_ERROR(error, "abort_transaction() failed: ");
TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
rd_kafka_error_destroy(error);
fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
/* Verify that the producer sent the expected number of EndTxn requests
* by inspecting the mock broker error stack,
* which should now be empty. */
if (rd_kafka_mock_broker_error_stack_cnt(
mcluster, txn_coord, RD_KAFKAP_EndTxn, &errors_cnt)) {
TEST_FAIL(
"Broker error count should succeed for API %s"
" on broker %" PRId32,
rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), txn_coord);
}
/* Checks all the RD_KAFKAP_EndTxn responses have been consumed */
TEST_ASSERT(errors_cnt == 0,
"Expected error count 0 for API %s, found %zu",
rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), errors_cnt);
if (rd_kafka_mock_broker_error_stack_cnt(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, &errors_cnt)) {
TEST_FAIL(
"Broker error count should succeed for API %s"
" on broker %" PRId32,
rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), txn_coord);
}
/* Checks none of the RD_KAFKAP_InitProducerId responses have been
* consumed
*/
TEST_ASSERT(errors_cnt == 1,
"Expected error count 1 for API %s, found %zu",
rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), errors_cnt);
/* All done */
rd_kafka_destroy(rk);
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
SUB_TEST_PASS();
}
/**
* @brief Test that the TxnOffsetCommit op doesn't retry without waiting
* if the coordinator is found but not available, causing too frequent retries.
*/
static void
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_bool_t times_out) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
int timeout;
SUB_TEST_QUICK("times_out=%s", RD_STR_ToF(times_out));
rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
test_curr->ignore_dr_err = rd_true;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
/* Wait for messages to be delivered */
test_flush(rk, 5000);
/*
* Fail TxnOffsetCommit with COORDINATOR_NOT_AVAILABLE
* repeatedly.
*/
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_TxnOffsetCommit, 4,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE);
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 1;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
/* The retry delay is 500ms, with 4 retries it should take at least
* 2000ms for this call to succeed. */
timeout = times_out ? 500 : 4000;
error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata,
timeout);
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
if (times_out) {
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
"expected %s, got: %s",
rd_kafka_err2name(
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE),
rd_kafka_err2str(rd_kafka_error_code(error)));
} else {
TEST_ASSERT(rd_kafka_error_code(error) ==
RD_KAFKA_RESP_ERR_NO_ERROR,
"expected \"Success\", found: %s",
rd_kafka_err2str(rd_kafka_error_code(error)));
}
rd_kafka_error_destroy(error);
/* All done */
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
int main_0105_transactions_mock(int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
return 0;
}
do_test_txn_recoverable_errors();
do_test_txn_fatal_idempo_errors();
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
do_test_txn_req_cnt();
do_test_txn_requires_abort_errors();
do_test_txn_slow_reinit(rd_false);
do_test_txn_slow_reinit(rd_true);
/* Just do a subset of tests in quick mode */
if (test_quick)
return 0;
do_test_txn_endtxn_errors();
do_test_txn_endtxn_infinite();
do_test_txn_endtxn_timeout();
do_test_txn_endtxn_timeout_inflight();
/* Bring down the coordinator */
do_test_txn_broker_down_in_txn(rd_true);
/* Bring down partition leader */
do_test_txn_broker_down_in_txn(rd_false);
do_test_txns_not_supported();
do_test_txns_send_offsets_concurrent_is_retried();
do_test_txns_send_offsets_non_eligible();
do_test_txn_coord_req_destroy();
do_test_txn_coord_req_multi_find();
do_test_txn_addparts_req_multi();
do_test_txns_no_timeout_crash();
do_test_txn_auth_failure(
RD_KAFKAP_InitProducerId,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
do_test_txn_auth_failure(
RD_KAFKAP_FindCoordinator,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
do_test_txn_flush_timeout();
do_test_unstable_offset_commit();
do_test_commit_after_msg_timeout();
do_test_txn_switch_coordinator();
do_test_txn_switch_coordinator_refresh();
do_test_out_of_order_seq();
do_test_topic_disappears_for_awhile();
do_test_disconnected_group_coord(rd_false);
do_test_disconnected_group_coord(rd_true);
do_test_txn_coordinator_null_not_fatal();
do_test_txn_resumable_calls_timeout(rd_true);
do_test_txn_resumable_calls_timeout(rd_false);
do_test_txn_resumable_calls_timeout_error(rd_true);
do_test_txn_resumable_calls_timeout_error(rd_false);
do_test_txn_resumable_init();
do_test_txn_concurrent_operations(rd_true /*commit*/);
do_test_txn_concurrent_operations(rd_false /*abort*/);
do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_true);
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_false);
return 0;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kenvins/librdkafka.git
git@gitee.com:kenvins/librdkafka.git
kenvins
librdkafka
librdkafka
master

搜索帮助