1、在回调函数中加while无限循环,没有实现循环生产数据,也没有实现循环消费数据
2、在消费者客户端中不使用set_offset_store时,总是消费到所有数据,类似kafkad消费主题数据命令时加了--from-beginning
3、在消费中设置了config.set_offset_store(0);尝试了很多设置,也没有实现实时读取kafka中主题的当前数据
4、在消费者设置了消费组模式时,没有消费到任何数据,kafka命令是可以的
望大佬指点下,workflow_kafka的使用。
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/WFKafkaClient.h"
#include "workflow/KafkaMessage.h"
#include "workflow/KafkaResult.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#include "workflow/WFGlobal.h"
using namespace protocol;
static WFFacilities::WaitGroup wait_group(1);
std::string url;
bool no_cgroup = false;
WFKafkaClient client;
void kafka_callback(WFKafkaTask *task)
{
int index = 0;
int api_type = task->get_api_type();
WFKafkaTask *next_task = NULL;
std::vector<std::vector<KafkaRecord *>> records;
std::vector<KafkaToppar *> toppars;
while (true)
{
printf("$$$...%d###%d\n", index, api_type);
int state = task->get_state();
int error = task->get_error();
if (state != WFT_STATE_SUCCESS)
{
fprintf(stderr, "error msg: %s\n", WFGlobal::get_error_string(state, error));
fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
client.deinit();
wait_group.done();
return;
}
protocol::KafkaResult new_result;
switch (api_type)
{
case Kafka_Produce:
task->get_result()->fetch_records(records);
for (const auto &v : records)
{
for (const auto &w : v)
{
//设置偏移
w->set_offset(w->get_offset() + 1);
//设置值
w->set_value("333", 3);
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
w->get_topic(),
w->get_partition(),
w->get_status(),
w->get_offset(), value_len);
}
}
break;
case Kafka_Fetch:
new_result = std::move(*task->get_result());
new_result.fetch_records(records);
if (!records.empty())
{
if (!no_cgroup)
next_task = client.create_kafka_task("api=commit", 3, kafka_callback);
std::string out;
for (const auto &v : records)
{
if (v.empty())
continue;
// 设置偏移量
v.back()->set_offset(v.back()->get_offset() + 1);
char fn[1024];
snprintf(fn, 1024, "./kafka.%s.%d.%llu",
v.back()->get_topic(),
v.back()->get_partition(),
v.back()->get_offset());
FILE *fp = fopen(fn, "w+");
long long offset = 0;
int partition = 0;
std::string topic;
for (const auto &w : v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
if (fp)
// fwrite(value, value_len, 1, fp);
printf("$: %s\n", (char *)value);
offset = w->get_offset();
partition = w->get_partition();
topic = w->get_topic();
if (!no_cgroup)
next_task->add_commit_record(*w);
}
if (!topic.empty())
{
out += "topic: " + topic;
out += ",partition: " + std::to_string(partition);
out += ",offset: " + std::to_string(offset) + ";";
}
if (fp)
fclose(fp);
}
printf("fetch\t%s\n", out.c_str());
if (!no_cgroup)
series_of(task)->push_back(next_task);
}
else
{
printf("fetch_records is null...\n");
}
break;
case Kafka_OffsetCommit:
task->get_result()->fetch_toppars(toppars);
if (!toppars.empty())
{
for (const auto &v : toppars)
{
printf("commit\ttopic: %s, partition: %d, offset: %llu, error: %d\n",
v->get_topic(), v->get_partition(),
v->get_offset(), v->get_error());
}
}
next_task = client.create_leavegroup_task(3, kafka_callback);
series_of(task)->push_back(next_task);
break;
case Kafka_LeaveGroup:
printf("leavegroup callback\n");
break;
default:
break;
}
if (!next_task)
{
client.deinit();
wait_group.done();
}
next_task = NULL;
records.capacity();
toppars.capacity();
index++;
sleep(1);
}
}
void sig_handler(int signo) {}
int main(int argc, char *argv[])
{
// if (argc < 3)
// {
// fprintf(stderr, "USAGE: %s url <p/c> [compress_type/d]\n", argv[0]);
// exit(1);
// }
signal(SIGINT, sig_handler);
// url = argv[1];
// if (strncmp(argv[1], "kafka://", 8) != 0)
// url = "kafka://" + url;
url = "192.168.31.121:9092";
// char buf[512 * 1024];
char buf[100] = "H-H-H";
WFKafkaTask *task;
int _select = 2;
if (_select == 1)
{
int compress_type = Kafka_NoCompress;
// if (argc > 3)
// compress_type = atoi(argv[3]);
if (compress_type > Kafka_Zstd)
exit(1);
client.init(url);
task = client.create_kafka_task("api=produce", 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;
config.set_compress_type(compress_type);
config.set_client_id("workflow");
task->set_config(std::move(config));
// for (size_t i = 0; i < sizeof(buf); ++i)
// buf[i] = '1' + rand() % 128;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
record.set_key("key2", strlen("key2"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk2", 3, "hv2", 3);
task->add_produce_record("workflow_test2", -1, std::move(record));
}
else if (_select == 2)
{
int cgroup = 1;
if (cgroup == 0)
{
client.init(url);
task = client.create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
toppar.set_topic_partition("workflow_test2", 0);
toppar.set_offset(1);
task->add_toppar(toppar);
no_cgroup = true;
}
else
{
client.init(url, "workflow_group");
task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
3, kafka_callback);
}
KafkaConfig config;
config.set_client_id("workflow");
// 0:优先使用上次提交; 1:使用指定的offset
config.set_offset_store(0);
task->set_config(std::move(config));
}
else
{
fprintf(stderr, "USAGE: %s url <p/c> [compress_type/d]\n", argv[0]);
exit(1);
}
task->start();
wait_group.wait();
while (true)
{
/* code */
}
return 0;
}
不好意思,才看到issue。我们看一下
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
你的需求是不是只想从最新的数据开始消费?那你应该用一下config里的set_offset_timestamp。这个的默认值是KAFKA_TIMESTAMP_LATEST,表示从最后一条数据开始消费。你也可改成KAFKA_TIMESTAMP_EARLIEST,表示从最早一条数据开始消费。你可以改成从当前时间开始消费:
KafkaConfig config;
auto now = std::chrono::system_clock::now().time_since_epoch();
config.set_offset_timestamp(duration_cast<milliseconds>(now).count());
task->set_config(std::move(config));
感谢大佬的指点,三个不同参数的设置,效果都是一样的,都是获取kafka里面所有的数据。
KafkaConfig config;
config.set_client_id("workflow");
auto now = std::chrono::system_clock::now().time_since_epoch();
config.set_offset_timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(now).count());
// KAFKA_TIMESTAMP_LATEST
// config.set_offset_timestamp(KAFKA_TIMESTAMP_LATEST);
// KAFKA_TIMESTAMP_EARLIEST
// config.set_offset_timestamp(KAFKA_TIMESTAMP_EARLIEST);
task->set_config(std::move(config));
另外,你好像不太知道workflow的用法。
workflow是每个请求都是一个独立的task的,你循环的话,需要再次调用create_kafka_task,并执行push_back操作(series_of(task)->push_back(next))。并不是start一个任务之后就会源源不断的得到结果。
循环产生任务的方法你会了吗?大概是这个结构:
void callback(WFKafkaTask *task)
{
process_task(task);
if (end)
break;
next = client.create_kafka_task(..., callback);
series_of(task)->push_back(next);
}
不好意思,之前那个循环消费的例子写错了。不应该有while。已更正!
登录 后才可以发表评论