6K Star 11.8K Fork 4K

GVPdotNET China / Furion

 / 详情

EventBus 增加适配 Kafka 功能支持和示例说明

已完成
创建于  
2022-09-01 12:09

1、函数能支持同步和异步两种。
2、模式希望考虑 pub/sub(发布订阅模式)
call/back(Rpc调用模式)

评论 (4)

背大包的猪 创建了任务

kafka EventConsumer没有接收事件,需要自己实现

/// <summary>
/// Kafka 消息扩展
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public class EventConsumer<TKey, TValue> : IDisposable
{
    private Task _consumerTask;
    private CancellationTokenSource _consumerCts;
    /// <summary>
    /// 消费者
    /// </summary>
    public IConsumer<TKey, TValue> Consumer { get; }
    /// <summary>
    /// ConsumerBuilder
    /// </summary>
    public ConsumerBuilder<TKey, TValue> Builder { get; set; }
    /// <summary>
    /// 消息回调
    /// </summary>
    public event EventHandler<ConsumeResult<TKey, TValue>> Received;
    /// <summary>
    /// 异常回调
    /// </summary>
    public event EventHandler<ConsumeException> OnConsumeException;

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="config"></param>
    public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        Builder = new ConsumerBuilder<TKey, TValue>(config);
        Consumer = Builder.Build();
    }

    /// <summary>
    /// 启动
    /// </summary>
    /// <exception cref="InvalidOperationException"></exception>
    public void Start()
    {
        if (Consumer.Subscription?.Any() != true)
        {
            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
        }
        if (_consumerTask != null)
        {
            return;
        }
        _consumerCts = new CancellationTokenSource();
        var ct = _consumerCts.Token;
        _consumerTask = Task.Factory.StartNew(() =>
        {
            while (!ct.IsCancellationRequested)
            {
                try
                {
                    var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
                    if (cr == null) continue;
                    Received?.Invoke(this, cr);
                }
                catch (ConsumeException e)
                {
                    OnConsumeException?.Invoke(this, e);
                }
            }
        }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }

    /// <summary>
    /// 停止
    /// </summary>
    /// <returns></returns>
    public async Task Stop()
    {
        if (_consumerCts == null || _consumerTask == null) return;
        _consumerCts.Cancel();
        try
        {
            await _consumerTask;
        }
        finally
        {
            _consumerTask = null;
            _consumerCts = null;
        }
    }

    /// <summary>
    /// 释放
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// 释放
    /// </summary>
    /// <param name="disposing"></param>
    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
        {
            if (_consumerTask != null)
            {
                Stop().Wait();
            }
            Consumer?.Dispose();
        }
    }
}

然后仿照作者的 RabbitMQEventSourceStorer,添加 KafkaEventSourceStore

/// <summary>
/// Kafka 存储源
/// </summary>
public class KafkaEventSourceStore : IEventSourceStorer, IDisposable
{
    /// <summary>
    /// 内存通道事件源存储器
    /// </summary>
    private readonly Channel<IEventSource> _channel;
    /// <summary>
    /// 主题
    /// </summary>
    private readonly string _topic;
    /// <summary>
    /// 消费者
    /// </summary>
    private readonly EventConsumer<Null, string> _eventConsumer;
    /// <summary>
    /// 生产者
    /// </summary>
    private readonly IProducer<Null, string> _producer;

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="consumerConf">消费者配置</param>
    /// <param name="producerConf">生产者配置</param>
    /// <param name="topic">主题</param>
    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
    public KafkaEventSourceStore(ConsumerConfig consumerConf, ProducerConfig producerConf, string topic, int capacity)
    {
        // 配置通道,设置超出默认容量后进入等待
        var boundedChannelOptions = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        // 创建有限容量通道
        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
        // 主题
        _topic = topic;
        // 创建消息订阅者
        _eventConsumer = new EventConsumer<Null, string>(consumerConf);
        _eventConsumer.Consumer.Subscribe(new[] { topic });
        // 订阅消息写入 Channel
        _eventConsumer.Received += (send, cr) =>
        {
            // 反序列化消息
            var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr.Message.Value);
            // 写入内存管道存储器
            _channel.Writer.TryWrite(eventSource);
        };

        // 启动消费者
        _eventConsumer.Start();
        // 创建生产者
        _producer = new ProducerBuilder<Null, string>(producerConf).Build();
    }

    /// <summary>
    /// 将事件源写入存储器
    /// </summary>
    /// <param name="eventSource">事件源对象</param>
    /// <param name="cancellationToken">取消任务 Token</param>
    /// <returns><see cref="ValueTask"/></returns>
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
    {
        if (eventSource == default)
        {
            throw new ArgumentNullException(nameof(eventSource));
        }
        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
        if (eventSource is ChannelEventSource source)
        {
            // 序列化
            var data = JsonConvert.SerializeObject(source);
            // 异步发布
            await _producer.ProduceAsync(_topic, new Message<Null, string>
            {
                Value = data
            }, cancellationToken);
        }
        else
        {
            // 这里处理动态订阅问题
            await _channel.Writer.WriteAsync(eventSource, cancellationToken);
        }
    }

    /// <summary>
    /// 从存储器中读取一条事件源
    /// </summary>
    /// <param name="cancellationToken">取消任务 Token</param>
    /// <returns>事件源对象</returns>
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
    {
        // 读取一条事件源
        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
        return eventSource;
    }

    /// <summary>
    /// 释放非托管资源
    /// </summary>
    public async void Dispose()
    {
        await _eventConsumer.Stop();
        GC.SuppressFinalize(this);
    }
}

替换默认的事件总线

var consumerConf = new ConsumerConfig
{
    BootstrapServers = "xxx.xxx.xxx.xxx:9092",
    GroupId = "Consumer",
    AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
};

var producerConf = new ProducerConfig
{
    BootstrapServers = "xxx.xxx.xxx.xxx:9092",
    BatchSize = 16384, // 修改批次大小为16K
    LingerMs = 20 // 修改等待时间为20ms
};

// 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
var kafkaEventSourceStorer = new KafkaEventSourceStore(consumerConf, producerConf, "testTopic", 30000);
// 替换默认事件总线存储器
builder.ReplaceStorer(serviceProvider =>
{
    return kafkaEventSourceStorer;
});

kafka版本v1.28.2,测试通过,如果防止消息丢失,可以在 consumerConfig 禁止 AutoCommit,自己移动 Offset

赞啊!!!很牛。

百小僧 任务状态待办的 修改为已完成
百小僧 修改了描述
百小僧 修改了标题
百小僧 修改了标题

登录 后才可以发表评论

状态
负责人
里程碑
Pull Requests
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
开始日期   -   截止日期
-
置顶选项
优先级
参与者(4)
1916182 visker 1659857267 974299 monksoul 1578937227 61753 zuohuaijun 1686997111 441323 ymd 1578923960
C#
1
https://gitee.com/dotnetchina/Furion.git
git@gitee.com:dotnetchina/Furion.git
dotnetchina
Furion
Furion

搜索帮助