135 Star 1.1K Fork 350

GVP搜狗开源 / workflow

 / 详情

使用tutorial-13-kafka_cli.cc里面的代码学着操作kafka,有四点疑问,内容如下

待办的
创建于  
2023-01-14 21:28

1、在回调函数中加while无限循环,没有实现循环生产数据,也没有实现循环消费数据
2、在消费者客户端中不使用set_offset_store时,总是消费到所有数据,类似kafkad消费主题数据命令时加了--from-beginning
3、在消费中设置了config.set_offset_store(0);尝试了很多设置,也没有实现实时读取kafka中主题的当前数据
4、在消费者设置了消费组模式时,没有消费到任何数据,kafka命令是可以的

望大佬指点下,workflow_kafka的使用。

#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/WFKafkaClient.h"
#include "workflow/KafkaMessage.h"
#include "workflow/KafkaResult.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#include "workflow/WFGlobal.h"

using namespace protocol;

static WFFacilities::WaitGroup wait_group(1);

std::string url;
bool no_cgroup = false;
WFKafkaClient client;

void kafka_callback(WFKafkaTask *task)
{
    int index = 0;

    int api_type = task->get_api_type();

    WFKafkaTask *next_task = NULL;
    std::vector<std::vector<KafkaRecord *>> records;
    std::vector<KafkaToppar *> toppars;

    while (true)
    {
        printf("$$$...%d###%d\n", index, api_type);

        int state = task->get_state();
        int error = task->get_error();

        if (state != WFT_STATE_SUCCESS)
        {
            fprintf(stderr, "error msg: %s\n", WFGlobal::get_error_string(state, error));
            fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
            client.deinit();
            wait_group.done();
            return;
        }

        protocol::KafkaResult new_result;

        switch (api_type)
        {
        case Kafka_Produce:
            task->get_result()->fetch_records(records);

            for (const auto &v : records)
            {
                for (const auto &w : v)
                {
                    //设置偏移
                    w->set_offset(w->get_offset() + 1);
                    //设置值
                    w->set_value("333", 3);

                    const void *value;
                    size_t value_len;
                    w->get_value(&value, &value_len);

                    printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
                           w->get_topic(),
                           w->get_partition(),
                           w->get_status(),
                           w->get_offset(), value_len);
                }
            }

            break;

        case Kafka_Fetch:
            new_result = std::move(*task->get_result());
            new_result.fetch_records(records);

            if (!records.empty())
            {
                if (!no_cgroup)
                    next_task = client.create_kafka_task("api=commit", 3, kafka_callback);

                std::string out;

                for (const auto &v : records)
                {
                    if (v.empty())
                        continue;

                    // 设置偏移量
                    v.back()->set_offset(v.back()->get_offset() + 1);

                    char fn[1024];
                    snprintf(fn, 1024, "./kafka.%s.%d.%llu",
                             v.back()->get_topic(), 
                             v.back()->get_partition(),
                             v.back()->get_offset());

                    FILE *fp = fopen(fn, "w+");
                    long long offset = 0;
                    int partition = 0;
                    std::string topic;

                    for (const auto &w : v)
                    {
                        const void *value;
                        size_t value_len;
                        w->get_value(&value, &value_len);
                        if (fp)
                            // fwrite(value, value_len, 1, fp);

                            printf("$: %s\n", (char *)value);

                        offset = w->get_offset();
                        partition = w->get_partition();
                        topic = w->get_topic();

                        if (!no_cgroup)
                            next_task->add_commit_record(*w);
                    }

                    if (!topic.empty())
                    {
                        out += "topic: " + topic;
                        out += ",partition: " + std::to_string(partition);
                        out += ",offset: " + std::to_string(offset) + ";";
                    }

                    if (fp)
                        fclose(fp);
                }

                printf("fetch\t%s\n", out.c_str());

                if (!no_cgroup)
                    series_of(task)->push_back(next_task);
            }
            else
            {
                printf("fetch_records is null...\n");
            }

            break;

        case Kafka_OffsetCommit:
            task->get_result()->fetch_toppars(toppars);

            if (!toppars.empty())
            {
                for (const auto &v : toppars)
                {
                    printf("commit\ttopic: %s, partition: %d, offset: %llu, error: %d\n",
                           v->get_topic(), v->get_partition(),
                           v->get_offset(), v->get_error());
                }
            }

            next_task = client.create_leavegroup_task(3, kafka_callback);

            series_of(task)->push_back(next_task);

            break;

        case Kafka_LeaveGroup:
            printf("leavegroup callback\n");
            break;

        default:
            break;
        }

        if (!next_task)
        {
            client.deinit();
            wait_group.done();
        }

        next_task = NULL;
        records.capacity();
        toppars.capacity();

        index++;

        sleep(1);
    }
}

void sig_handler(int signo) {}

int main(int argc, char *argv[])
{
    // if (argc < 3)
    // {
    //     fprintf(stderr, "USAGE: %s url <p/c> [compress_type/d]\n", argv[0]);
    //     exit(1);
    // }

    signal(SIGINT, sig_handler);

    // url = argv[1];
    // if (strncmp(argv[1], "kafka://", 8) != 0)
    //     url = "kafka://" + url;

    url = "192.168.31.121:9092";

    // char buf[512 * 1024];
    char buf[100] = "H-H-H";
    WFKafkaTask *task;

    int _select = 2;
    if (_select == 1)
    {
        int compress_type = Kafka_NoCompress;

        // if (argc > 3)
        //     compress_type = atoi(argv[3]);

        if (compress_type > Kafka_Zstd)
            exit(1);

        client.init(url);
        task = client.create_kafka_task("api=produce", 3, kafka_callback);
        KafkaConfig config;
        KafkaRecord record;

        config.set_compress_type(compress_type);
        config.set_client_id("workflow");
        task->set_config(std::move(config));

        // for (size_t i = 0; i < sizeof(buf); ++i)
        //     buf[i] = '1' + rand() % 128;

        record.set_key("key1", strlen("key1"));
        record.set_value(buf, sizeof(buf));
        record.add_header_pair("hk1", 3, "hv1", 3);
        task->add_produce_record("workflow_test1", -1, std::move(record));

        record.set_key("key2", strlen("key2"));
        record.set_value(buf, sizeof(buf));
        record.add_header_pair("hk2", 3, "hv2", 3);
        task->add_produce_record("workflow_test2", -1, std::move(record));
    }
    else if (_select == 2)
    {
        int cgroup = 1;
        if (cgroup == 0)
        {
            client.init(url);
            task = client.create_kafka_task("api=fetch", 3, kafka_callback);

            KafkaToppar toppar;
            toppar.set_topic_partition("workflow_test1", 0);
            toppar.set_offset(0);
            task->add_toppar(toppar);

            toppar.set_topic_partition("workflow_test2", 0);
            toppar.set_offset(1);
            task->add_toppar(toppar);

            no_cgroup = true;
        }
        else
        {
            client.init(url, "workflow_group");
            task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
                                            3, kafka_callback);
        }

        KafkaConfig config;
        config.set_client_id("workflow");

        // 0:优先使用上次提交; 1:使用指定的offset
        config.set_offset_store(0);

        task->set_config(std::move(config));
    }
    else
    {
        fprintf(stderr, "USAGE: %s url <p/c> [compress_type/d]\n", argv[0]);
        exit(1);
    }

    task->start();

    wait_group.wait();

    while (true)
    {
        /* code */
    }

    return 0;
}

评论 (10)

XiaoQinEr 创建了任务
XiaoQinEr 修改了描述
展开全部操作日志

不好意思,才看到issue。我们看一下

你的需求是不是只想从最新的数据开始消费?那你应该用一下config里的set_offset_timestamp。这个的默认值是KAFKA_TIMESTAMP_LATEST,表示从最后一条数据开始消费。你也可改成KAFKA_TIMESTAMP_EARLIEST,表示从最早一条数据开始消费。你可以改成从当前时间开始消费:

    KafkaConfig config;
    auto now = std::chrono::system_clock::now().time_since_epoch();
    config.set_offset_timestamp(duration_cast<milliseconds>(now).count());
    task->set_config(std::move(config));

感谢大佬的指点,三个不同参数的设置,效果都是一样的,都是获取kafka里面所有的数据。

        KafkaConfig config;
        config.set_client_id("workflow");

        auto now = std::chrono::system_clock::now().time_since_epoch();
        config.set_offset_timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(now).count());

        // KAFKA_TIMESTAMP_LATEST
        // config.set_offset_timestamp(KAFKA_TIMESTAMP_LATEST);
        // KAFKA_TIMESTAMP_EARLIEST
        // config.set_offset_timestamp(KAFKA_TIMESTAMP_EARLIEST);

        task->set_config(std::move(config));

另外,你好像不太知道workflow的用法。
workflow是每个请求都是一个独立的task的,你循环的话,需要再次调用create_kafka_task,并执行push_back操作(series_of(task)->push_back(next))。并不是start一个任务之后就会源源不断的得到结果。

我确实不会用这个框架,麻烦你在有时间的时候写写这个无限循环生产数据和无限循环消费数据的案例,对这个异步的调用没有弄明白,还有就是workflow关于kafka相关的接口文档太少了。

输入图片说明

workflow消费数据时,很不稳定。

消费的超时默认是500ms,是不是生产速度比较慢?kafka client应该不会有特别明显的问题的哈。
gitee有时候回消息比较慢,如果条件允许可以发github。
timestamp的问题我再看一下。

循环产生任务的方法你会了吗?大概是这个结构:

void callback(WFKafkaTask *task)
{
   process_task(task);
   if (end)
        break;
        
    next = client.create_kafka_task(..., callback);
    series_of(task)->push_back(next);
}

不好意思,之前那个循环消费的例子写错了。不应该有while。已更正!

登录 后才可以发表评论

状态
负责人
里程碑
Pull Requests
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
开始日期   -   截止日期
-
置顶选项
优先级
参与者(2)
C++
1
https://gitee.com/sogou/workflow.git
git@gitee.com:sogou/workflow.git
sogou
workflow
workflow

搜索帮助