代码拉取完成,页面将自动刷新
1、函数能支持同步和异步两种。
2、模式希望考虑 pub/sub(发布订阅模式)
call/back(Rpc调用模式)
kafka EventConsumer没有接收事件,需要自己实现
/// <summary>
/// Kafka 消息扩展
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public class EventConsumer<TKey, TValue> : IDisposable
{
private Task _consumerTask;
private CancellationTokenSource _consumerCts;
/// <summary>
/// 消费者
/// </summary>
public IConsumer<TKey, TValue> Consumer { get; }
/// <summary>
/// ConsumerBuilder
/// </summary>
public ConsumerBuilder<TKey, TValue> Builder { get; set; }
/// <summary>
/// 消息回调
/// </summary>
public event EventHandler<ConsumeResult<TKey, TValue>> Received;
/// <summary>
/// 异常回调
/// </summary>
public event EventHandler<ConsumeException> OnConsumeException;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
{
Builder = new ConsumerBuilder<TKey, TValue>(config);
Consumer = Builder.Build();
}
/// <summary>
/// 启动
/// </summary>
/// <exception cref="InvalidOperationException"></exception>
public void Start()
{
if (Consumer.Subscription?.Any() != true)
{
throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
}
if (_consumerTask != null)
{
return;
}
_consumerCts = new CancellationTokenSource();
var ct = _consumerCts.Token;
_consumerTask = Task.Factory.StartNew(() =>
{
while (!ct.IsCancellationRequested)
{
try
{
var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
if (cr == null) continue;
Received?.Invoke(this, cr);
}
catch (ConsumeException e)
{
OnConsumeException?.Invoke(this, e);
}
}
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
/// <summary>
/// 停止
/// </summary>
/// <returns></returns>
public async Task Stop()
{
if (_consumerCts == null || _consumerTask == null) return;
_consumerCts.Cancel();
try
{
await _consumerTask;
}
finally
{
_consumerTask = null;
_consumerCts = null;
}
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_consumerTask != null)
{
Stop().Wait();
}
Consumer?.Dispose();
}
}
}
然后仿照作者的 RabbitMQEventSourceStorer,添加 KafkaEventSourceStore
/// <summary>
/// Kafka 存储源
/// </summary>
public class KafkaEventSourceStore : IEventSourceStorer, IDisposable
{
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// 主题
/// </summary>
private readonly string _topic;
/// <summary>
/// 消费者
/// </summary>
private readonly EventConsumer<Null, string> _eventConsumer;
/// <summary>
/// 生产者
/// </summary>
private readonly IProducer<Null, string> _producer;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="consumerConf">消费者配置</param>
/// <param name="producerConf">生产者配置</param>
/// <param name="topic">主题</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public KafkaEventSourceStore(ConsumerConfig consumerConf, ProducerConfig producerConf, string topic, int capacity)
{
// 配置通道,设置超出默认容量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
// 主题
_topic = topic;
// 创建消息订阅者
_eventConsumer = new EventConsumer<Null, string>(consumerConf);
_eventConsumer.Consumer.Subscribe(new[] { topic });
// 订阅消息写入 Channel
_eventConsumer.Received += (send, cr) =>
{
// 反序列化消息
var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr.Message.Value);
// 写入内存管道存储器
_channel.Writer.TryWrite(eventSource);
};
// 启动消费者
_eventConsumer.Start();
// 创建生产者
_producer = new ProducerBuilder<Null, string>(producerConf).Build();
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化
var data = JsonConvert.SerializeObject(source);
// 异步发布
await _producer.ProduceAsync(_topic, new Message<Null, string>
{
Value = data
}, cancellationToken);
}
else
{
// 这里处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public async void Dispose()
{
await _eventConsumer.Stop();
GC.SuppressFinalize(this);
}
}
替换默认的事件总线
var consumerConf = new ConsumerConfig
{
BootstrapServers = "xxx.xxx.xxx.xxx:9092",
GroupId = "Consumer",
AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
};
var producerConf = new ProducerConfig
{
BootstrapServers = "xxx.xxx.xxx.xxx:9092",
BatchSize = 16384, // 修改批次大小为16K
LingerMs = 20 // 修改等待时间为20ms
};
// 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
var kafkaEventSourceStorer = new KafkaEventSourceStore(consumerConf, producerConf, "testTopic", 30000);
// 替换默认事件总线存储器
builder.ReplaceStorer(serviceProvider =>
{
return kafkaEventSourceStorer;
});
kafka版本v1.28.2,测试通过,如果防止消息丢失,可以在 consumerConfig 禁止 AutoCommit,自己移动 Offset
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
文档已更新:https://dotnetchina.gitee.io/furion/docs/event-bus/#2242:admin.net中如何引用ISqlRepository-kafka-%E8%87%AA%E5%AE%9A%E4%B9%89%E6%8C%87%E5%8D%97
很赞
登录 后才可以发表评论