1 Star 2 Fork 3

ThingsGateway/Docs

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
KafkaProducer.mdx 8.72 KB
一键复制 编辑 原始数据 按行查看 历史
Diego 提交于 2026-02-03 22:01 +08:00 . AI补充文档
---
id: 304
title: KafkaProducer
---
import useBaseUrl from "@docusaurus/useBaseUrl";
import Tag from "@site/src/components/Tag.js";
import Highlight from '@site/src/components/Highlight.js';
:::tip `提示`
通过自定义脚本,可快速适配业务模型
:::
:::tip `须知`
插件使用**librdkafka**,注意按需安装c库
On Mac OSX, install librdkafka with homebrew:
```
$ brew install librdkafka
```
On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:
```
$ apt install librdkafka-dev
```
On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, instructions here and then install librdkafka:
```
$ yum install librdkafka-devel
```
For other platforms, follow the source building instructions below.
:::
## 概述
KafkaProducer 是 ThingsGateway 中用于适配 Kafka 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 Kafka 服务端。Kafka 是一款分布式流处理平台,具有高吞吐量、高可靠性和可扩展性,适合处理海量数据流。
### 核心功能
- **多实体发布**:支持发布设备、变量和报警三种实体类型的数据
- **灵活的上传模式**:支持定时上传、变化上传或两者同时生效
- **分组上传**:支持按变量分组属性进行批量上传
- **列表上传控制**:可控制设备、变量和报警是否以列表形式上传
- **动态Topic配置**:支持使用变量属性动态生成Topic名称
- **自定义脚本**:支持通过脚本自定义数据处理和转换逻辑
- **数据缓存**:支持内存队列和文件缓存,确保数据可靠性
### 应用场景
- **物联网数据采集**:将设备数据实时发布到 Kafka,供下游系统处理
- **实时监控系统**:将设备状态和报警信息发布到 Kafka,实现实时监控
- **数据集成**:作为数据集成的中间件,将设备数据转发到其他系统
- **大数据分析**:将设备数据发送到 Kafka,供大数据平台进行分析
## 插件属性配置
### 配置界面
<img src={require("@site/static/img/docs/KafkaProducer1.png").default} />
### 详细配置项
| 配置项 | 说明 | 默认值 | 建议值 |
|--------|------|--------|--------|
| **服务地址** | Kafka 服务地址 | 127.0.0.1:9092 | 多个地址用逗号分隔,如 "127.0.0.1:9092,127.0.0.1:9093" |
| **发布超时时间** | 发布消息的超时时间(毫秒) | 5000 | 根据网络状况调整,建议 3000-10000 |
| **用户名** | Kafka 认证用户名 | - | 启用认证时设置 |
| **密码** | Kafka 认证密码 | - | 启用认证时设置 |
| **SecurityProtocol** | 安全协议配置 | - | 如 "SASL_SSL"、"PLAINTEXT" 等 |
| **SaslMechanism** | SASL 认证机制 | - | 如 "PLAIN"、"SCRAM-SHA-256" 等 |
| **分组上传** | 启用后,无论是定时还是变化模式,始终会上传**变量分组属性**为key分组的全部变量 | False | 批量数据场景建议启用 |
| **选择全部变量** | 是否选择全部变量,true时不需要单个变量添加业务属性 | False | 变量较多时建议启用 |
| **设备状态列表上传** | 设备是否列表上传,false时每个设备实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| **变量列表上传** | 变量是否列表上传,false时每个变量实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| **报警列表上传** | 报警是否列表上传,false时每个报警实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| **设备Topic** | 设备实体的发布主题,使用``${key}``作为匹配项,key必须是上传实体中的属性 | - | 如 ``devices/${DeviceName}`` |
| **变量Topic** | 变量实体的发布主题,使用``${key}``作为匹配项,key必须是上传实体中的属性 | - | 如 ``variables/${DeviceName}/${Name}`` |
| **报警Topic** | 报警实体的发布主题,使用``${key}``作为匹配项,key必须是上传实体中的属性 | - | 如 ``alarms/${DeviceName}`` |
| **设备实体脚本** | 脚本返回新的实体列表,动态类中需继承**DynamicModelBase**,传入列表为**DeviceData** | - | 编辑页面中,可通过检查按钮验证脚本 |
| **变量实体脚本** | 脚本返回新的实体列表,动态类中需继承**DynamicModelBase**,传入列表为**VariableBasicData** | - | 编辑页面中,可通过检查按钮验证脚本 |
| **报警实体脚本** | 脚本返回新的实体列表,动态类中需继承**DynamicModelBase**,传入列表为**AlarmVariable** | - | 编辑页面中,可通过检查按钮验证脚本 |
| **上传模式** | 数据上传模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据数据采集需求选择 |
| **定时上传间隔** | 间隔执行时间(秒) | 10 | 根据数据更新频率设置 |
| **严格入队模式** | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - |
| **启用缓存** | 是否启用缓存 | False | 网络不稳定或数据量大时建议启用 |
| **缓存文件最大长度(mb)** | 缓存文件最大长度 | 100 | 根据磁盘空间和数据量设置 |
| **上传每页条数** | 每一次上传的列表最大数量 | 1000 | 根据Kafka性能和网络带宽调整 |
| **内存队列最大数量** | 内存队列的最大数量,超出或失败时转入文件缓存 | 10000 | 根据系统内存和数据量调整 |
## 脚本与实体
使用与 MqttClient 相同的脚本接口,通过实现 `DynamicModelBase` 接口来自定义数据处理逻辑。详细格式说明请参考 [MqttClient文档](./MqttClient.mdx#1-设备变量报警事件数据脚本开发指南)。
## 最佳实践
### Kafka 集群配置
1. **服务地址配置**
- 配置多个 Kafka broker 地址,提高可靠性
- 格式:`host1:port1,host2:port2,host3:port3`
2. **Topic 设计**
- 按实体类型和业务领域划分 Topic
- 使用动态 Topic 配置,如 `devices/${DeviceName}`
- 为不同重要性的数据设置不同的 Topic
3. **分区策略**
- 根据数据量和并发需求设置适当的分区数
- 分区数建议为 broker 数量的 2-4 倍
### 性能优化
1. **批量处理**
- 启用列表上传,减少网络往返次数
- 调整上传每页条数,平衡内存使用和吞吐量
2. **缓存配置**
- 启用缓存功能,确保网络不稳定时数据不丢失
- 合理设置内存队列大小和文件缓存大小
3. **数据压缩**
- 启用 Kafka 的数据压缩功能,减少网络传输开销
- 建议使用 lz4 或 snappy 压缩算法
4. **连接管理**
- 复用 Kafka 连接,减少连接建立开销
- 配置合理的连接超时和重试机制
### 可靠性保障
1. **数据一致性**
- 配置适当的 acks 参数,确保消息可靠送达
- 建议生产环境使用 `acks=all`
2. **错误处理**
- 在脚本中实现完善的错误处理逻辑
- 使用 try-catch 捕获异常,确保脚本稳定运行
3. **监控与告警**
- 监控 Kafka 集群的运行状态
- 监控消息生产和消费的延迟
- 设置告警机制,及时发现和解决问题
4. **安全配置**
- 启用 SASL 认证,保护 Kafka 集群
- 使用 SSL/TLS 加密,确保数据传输安全
## 故障排查
### 常见问题及解决方案
| 问题 | 可能原因 | 解决方案 |
|------|----------|----------|
| **消息发布失败** | 1. Kafka 服务不可用<br/>2. 网络连接问题<br/>3. 认证失败 | 1. 检查 Kafka 服务状态<br/>2. 检查网络连接和防火墙设置<br/>3. 验证认证信息是否正确 |
| **消息发布速度慢** | 1. Kafka 集群负载高<br/>2. 网络带宽不足<br/>3. 批量大小设置不合理 | 1. 监控 Kafka 集群性能<br/>2. 检查网络带宽使用情况<br/>3. 调整上传每页条数和批量大小 |
| **缓存文件过大** | 1. Kafka 服务持续不可用<br/>2. 缓存大小设置不合理 | 1. 解决 Kafka 服务问题<br/>2. 调整缓存文件最大长度 |
| **脚本执行失败** | 1. 脚本语法错误<br/>2. 脚本逻辑错误 | 1. 检查脚本语法<br/>2. 查看日志中的错误信息 |
| **Topic 不存在** | 1. Topic 未创建<br/>2. 权限不足 | 1. 确保 Topic 已创建<br/>2. 验证用户权限 |
### 日志分析
当遇到问题时,建议查看 ThingsGateway 的日志文件,特别是与 KafkaProducer 相关的日志。日志中通常会包含详细的错误信息,帮助定位问题。
## 总结
KafkaProducer 是 ThingsGateway 中功能强大的 Kafka 消息发布插件,通过合理配置和优化,可以为物联网系统提供高性能、可靠的数据发布方案。在实际应用中,应根据具体场景和需求,选择合适的配置参数和优化策略,以达到最佳的性能和可靠性。
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ThingsGateway/Docs.git
git@gitee.com:ThingsGateway/Docs.git
ThingsGateway
Docs
Docs
master

搜索帮助