# KafkaDrive
**Repository Path**: yswenli/KafkaDrive
## Basic Information
- **Project Name**: KafkaDrive
- **Description**: 这是kafka队列的一个简易封装;只需要配置kafka服务器和相应主题即可~
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-03-30
- **Last Updated**: 2021-03-30
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
@ -1,81 +1,2 @@
# KafkaDrive
这是kafka队列的一个简易封装;只需要配置kafka服务器和相应主题即可~
测试截图
# 测试实例如下:
/*****************************************************************************************************
* 本代码版权归@wenli所有,All Rights Reserved (C) 2015-2017
*****************************************************************************************************
* CLR版本:4.0.30319.42000
* 唯一标识:37b509d9-2fe4-4e5e-b1aa-080d67179289
* 机器名称:WENLI-PC
* 联系人邮箱:wenguoli_520@qq.com
*****************************************************************************************************
* 命名空间:Wenli.Drive.Kafka.Util
* 类名称:Program
* 创建时间:2017/2/10 17:00:46
* 创建人:wenli
* 创建说明:
*****************************************************************************************************/
```CSharp
@@ -0,0 +1,151 @@
/****************************************************************************
*项目名称:Wenli.Data.Kafka
*CLR 版本:4.0.30319.42000
*机器名称:WALLE-PC
*命名空间:Wenli.Data.Kafka
*类 名 称:Program
*版 本 号:V1.0.0.0
*创建人: yswenli
*电子邮箱:yswenli@outlook.com
*创建时间:2019/12/10 11:01:34
*描述:
*=====================================================================
*修改时间:2019/12/10 11:01:34
*修 改 人: yswenli
*版 本 号: V1.0.0.0
*描 述:
*****************************************************************************/
using System;
using System.Threading;
using System.Threading.Tasks;
using Wenli.Data.Kafka.Common;
namespace Wenli.Data.Kafka
{
class Program
{
static readonly string server = "10.205.243.25:9092";
static readonly string topic = "Wenli.Data.Kafka.Topic.Test";
static readonly string group = "Wenli.Data.Kafka.Group.Test";
public static void Main(params string[] args)
{
Console.Title = "Wenli.Data.Kafka";
Console.WriteLine("Wenli.Data.Kafka");
do
{
Console.WriteLine("输入p启动producer,输入c1启动consumer1,输入c2启动consumer2,输入a或其他启动全部!");
var input = Console.ReadLine();
switch (input)
{
case "p":
_ = InitProducer();
Console.WriteLine("Wenli.Data.Kafka正在运行 producer");
break;
case "c1":
InitConsumer1();
Console.WriteLine("Wenli.Data.Kafka正在运行 consumer");
break;
case "c2":
InitConsumer2();
Console.WriteLine("Wenli.Data.Kafka正在运行 consumer");
break;
default:
_ = InitProducer();
Console.WriteLine("Wenli.Data.Kafka正在运行 producer");
InitConsumer1();
InitConsumer2();
Console.WriteLine("Wenli.Data.Kafka正在运行 consumer");
break;
}
Console.ReadLine();
}
while (true);
}
static async Task InitProducer()
{
var count = 0;
Console.WriteLine("输入发送的消息数,默认为3条");
var input = Console.ReadLine();
if (!int.TryParse(input, out count))
{
count = 3;
}
var kafkaProducer = new Producer(server);
await Task.Run(() =>
{
for (int i = 0; i < count; i++)
{
try
{
var msg = SerializeUtil.Serialize(new TestData() { ID = GuidUtil.GuidString, Message = "Wenli.Data.Kafka.Test", Created = DateTimeUtil.CurrentDateTimeString });
if (i % 2 == 0)
kafkaProducer.SendMessage(topic, msg, -1, "aaa");
else
kafkaProducer.SendMessage(topic, msg);
Console.WriteLine($"KafkaProducer.SendMessage:{msg}");
}
catch (Exception ex)
{
Console.WriteLine($"KafkaProducer.SendMessage Error:{ex.Message}");
}
Thread.Sleep(100);
}
}).ConfigureAwait(true);
}
static void InitConsumer1()
{
Consumer consumer = new Consumer(server, new string[] { topic }, group);
consumer.OnReceived += Consumer_OnReceived;
consumer.OnError += Consumer_OnError;
consumer.Start();
}
static void InitConsumer2()
{
Consumer consumer = new Consumer(server, new string[] { topic }, group, true, false);
consumer.OnReceived += Consumer_OnReceived;
consumer.OnError += Consumer_OnError;
consumer.Start();
}
private static void Consumer_OnReceived(Consumer arg1, KafkaMessage arg2)
{
Console.WriteLine($"KafkaConsumer.Receive:{SerializeUtil.Serialize(arg2)}");
}
private static void Consumer_OnError(Consumer arg1, Exception arg2)
{
Console.WriteLine($"KafkaConsumer.Error:{arg2}");
}
}
#region TestData
public class TestData
{
public string ID { get; set; }
public string Message { get; set; }
public string Created { get; set; }
}
#endregion
}
```