3 Star 0 Fork 1

Gitee 极速下载/kafka-python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/dpkp/kafka-python
克隆/下载
example.py 1.94 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python
import threading, time
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092')
while not self.stop_event.is_set():
producer.send('my-topic', b"test")
producer.send('my-topic', b"\xc2Hola, mundo!")
time.sleep(1)
producer.close()
class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['my-topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
def main():
# Create 'my-topic' Kafka topic
try:
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
topic = NewTopic(name='my-topic',
num_partitions=1,
replication_factor=1)
admin.create_topics([topic])
except Exception:
pass
tasks = [
Producer(),
Consumer()
]
# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()
time.sleep(10)
# Stop threads
for task in tasks:
task.stop()
for task in tasks:
task.join()
if __name__ == "__main__":
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/kafka-python.git
git@gitee.com:mirrors/kafka-python.git
mirrors
kafka-python
kafka-python
master

搜索帮助