# iotPlatform **Repository Path**: garytwelve/iot-platform ## Basic Information - **Project Name**: iotPlatform - **Description**: 基于 ASP.NET Core 6.0 + Blazor Server 的物联网数据采集与管理平台,内置 MQTT Broker,支持多协议设备接入、JSON 数据解析、动态建表与数据持久化。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2026-04-28 - **Last Updated**: 2026-05-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # IoT Platform (IotWebPlatform) 基于 ASP.NET Core 6.0 + Blazor Server 的物联网数据采集与管理平台,内置 MQTT Broker,支持多协议设备接入、JSON 数据解析、动态建表与数据持久化。 ![](./assets/screenshot-20260502-193104.png) ![](./assets/screenshot-20260502-193132.png) ![](./assets/screenshot-20260502-193156.png) ![](./assets/screenshot-20260502-193241.png) ## 功能特性 - **MQTT Broker** — 内置 MQTTnet 服务端,支持 TCP / WebSocket / TLS 四种传输通道,客户端认证与权限管理 - **JSON 数据解析** — 灵活的 JSON Path 映射规则,支持数组遍历 `[]`、对象属性遍历 `{}`、动态键 `@key`/`@value` - **数据预处理** — 值变换(乘除加减/映射/替换)、范围过滤(丢弃/钳位/标记)、聚合统计(分组求均值/最值/计数等) - **动态建表** — 根据解析规则自动创建数据库表,支持 SQLite / MySQL / SQL Server - **多数据库** — 系统库与业务库分离,支持配置多个目标数据库 - **数据采集** — 支持 OPC UA / Modbus TCP / Modbus RTU / 模拟驱动,定时/Cron 调度采集 - **实时更新** — Blazor Server 服务端直接事件订阅,仪表盘数据实时刷新 - **系统管理** — 管理员账户管理、数据库配置、解析规则管理 ## 技术栈 | 类别 | 技术 | | ---- | ------------------------------ | | 后端框架 | ASP.NET Core 6.0 | | 前端框架 | Blazor Server | | ORM | Entity Framework Core 6.0 | | MQTT | MQTTnet 4.3.7 | | 实时通信 | Blazor Server(服务端事件订阅) | | 日志 | Serilog | | 认证 | Cookie Authentication + BCrypt | | 数据库 | SQLite(默认)/ MySQL / SQL Server | ## 项目结构 ``` IotWebPlatform/ ├── Data/ # 数据层 │ ├── AppDbContext.cs # EF Core 上下文 │ ├── DataSeeder.cs # 种子数据 │ └── Repositories/ # 泛型仓储 ├── Infrastructure/ # 基础设施 │ ├── AppStartService.cs # 应用运行时长 │ ├── CustomAuthStateProvider.cs # Blazor 认证状态 │ ├── EncryptionHelper.cs # AES-256 加密工具 │ ├── IngestionProcessingQueue.cs # 统一消息摄入处理队列 │ ├── SqliteWriteSynchronizer.cs # SQLite 写锁同步器 │ ├── SidebarStateService.cs # 侧边栏状态 │ └── ToastService.cs # Toast 通知 ├── Modules/ │ ├── Mqtt/ # MQTT 模块 │ │ ├── Handlers/ # 认证/消息/托管服务 │ │ ├── Models/ # 实体/视图模型 │ │ └── Services/ # 客户端/消息/凭证/集成服务 │ ├── Daq/ # 数据采集模块 │ │ ├── Drivers/ # 驱动抽象层(模拟/OPC UA/Modbus) │ │ ├── Models/ # 实体/视图模型 │ │ └── Services/ # 网关/设备/点位/采集服务 │ ├── Alert/ # 告警模块 │ │ ├── Channels/ # 通知渠道(钉钉/邮件/企业微信/Webhook) │ │ ├── Engine/ # 告警分发引擎 │ │ ├── Models/ # 实体/视图模型 │ │ ├── Preprocessing/ # 告警预处理器 │ │ ├── Serilog/ # Serilog AlertSink │ │ └── Services/ # 告警规则/记录/通知渠道服务 │ ├── Report/ # 报表查询模块 │ │ ├── Models/ # 视图模型 │ │ └── Services/ # 数据查询服务 │ ├── WebApi/ # Web API 模块 │ │ ├── Models/ # 实体/视图模型 │ │ └── Services/ # API 记录服务 │ └── SysCfg/ # 系统配置模块 │ ├── Models/ # 实体/视图模型 │ └── Services/ # 认证/解析规则/数据库配置服务 ├── Pages/ # Blazor 页面 │ ├── Mqtt/ # MQTT 仪表盘/消息/网关/设置 │ ├── Daq/ # 采集仪表盘/设备/点位/任务/数据 │ ├── Alert/ # 告警规则/记录/通知通道 │ ├── Report/ # 数据查询 │ ├── WebApi/ # API 消息/测试 │ └── SysCfg/ # 数据库配置/解析规则/用户管理/审计日志 ├── Shared/ │ ├── Parsing/ # 解析引擎/消息解析器/数据预处理/动态建表 │ └── UI/ # 共享 UI 组件 ├── wwwroot/ # 静态资源 ├── Program.cs # 应用入口 └── appsettings.json # 配置文件 ``` ## 快速开始 ### 环境要求 - .NET 6.0 SDK - SQLite(默认,无需额外安装) ### 运行 ```bash git clone https://github.com/xxx/iot-platform.git cd iot-platform dotnet run # 普通启动 dotnet run seed_data # 首次启动(带种子数据初始化) ``` 访问 `http://localhost:5000`,使用默认管理员账户登录: | 用户名 | 密码 | | ----- | --------- | | admin | admin@123 | ### MQTT 连接 | 参数 | 值 | | ------------ | ------ | | Broker 地址 | 服务器 IP | | TCP 端口 | 1883 | | WebSocket 端口 | 8083 | 默认 MQTT 凭证: | 用户名 | 密码 | | -------- | ------ | | test | 123 | ## 配置说明 `appsettings.json` 主要配置项: ```json { "Kestrel": { "Endpoints": { "Http": { "Url": "http://0.0.0.0:5000" // HTTP 监听地址 }, "Https": { "Enabled": false, // 启用 HTTPS "Url": "https://0.0.0.0:5001", // HTTPS 监听地址 "Certificate": { "Path": "", // PFX 证书文件路径 "Password": "", // 证书密码 "AllowInvalid": false // 允许无效证书 } } } }, "Mqtt": { "Host": "0.0.0.0", // MQTT 监听地址 "EnableTcp": true, // 启用 TCP 通道 "TcpPort": 1883, // TCP 端口 "EnableWebSocket": true, // 启用 WebSocket 通道 "WebSocketPort": 8083, // WebSocket 端口 "EnableTlsTcp": false, // 启用 TCP TLS 通道 "TlsTcpPort": 8883, // TCP TLS 端口 "EnableTlsWebSocket": false, // 启用 WebSocket TLS 通道 "TlsWebSocketPort": 8084, // WebSocket TLS 端口 "TlsCertificate": { "Path": "", // TLS 专用证书路径(为空则复用 HTTPS 证书) "Password": "" // 证书密码 }, "MaxConnections": 1000, // 最大连接数 "KeepAliveInterval": 60, // 心跳间隔(秒) "MaxPendingMessagesPerClient": 100 }, "SystemDatabase": { "Provider": "Sqlite", // 数据库类型: Sqlite / MySql / SqlServer "DatabaseUrl": "Data Source=IotWebPlatform.db", // 数据库连接URL(不含凭证) "EncryptedCredentials": "" // 加密后的凭证(MySQL/SQL Server需要) }, "Encryption": { "Key": "your-base64-encoded-aes-key" // AES-256加密密钥(Base64编码,32字节) } } ``` ### MQTT 传输通道 系统支持四种 MQTT 传输通道,可同时启用: | 通道 | 协议标识 | 默认端口 | 配置开关 | 适用场景 | |------|---------|---------|---------|---------| | TCP | `mqtt://host:port` | 1883 | `Mqtt:EnableTcp` | 内网设备、开发调试 | | WebSocket | `ws://host:port/mqtt` | 8083 | `Mqtt:EnableWebSocket` | 浏览器/Web应用 | | TCP TLS | `mqtts://host:port` | 8883 | `Mqtt:EnableTlsTcp` | 公网设备、安全要求高 | | WebSocket TLS | `wss://host:port/mqtt` | 8084 | `Mqtt:EnableTlsWebSocket` | 浏览器安全连接 | 所有通道共享同一个 MQTT Broker 实例,启动/停止/重启操作对所有已启用通道同时生效。 ### HTTPS / TLS 部署 #### 方式一:Kestrel 直连 TLS 修改 `appsettings.json` 启用 HTTPS 和/或 MQTT TLS: ```json { "Kestrel": { "Endpoints": { "Http": { "Url": "http://0.0.0.0:5000" }, "Https": { "Enabled": true, "Url": "https://0.0.0.0:5001", "Certificate": { "Path": "/path/to/your-cert.pfx", "Password": "your-password" } } } }, "Mqtt": { "EnableTlsTcp": true, "TlsTcpPort": 8883, "EnableTlsWebSocket": true, "TlsWebSocketPort": 8084, "TlsCertificate": { "Path": "", "Password": "" } } } ``` - `Mqtt:TlsCertificate:Path` 为空时自动复用 `Kestrel:Endpoints:Https:Certificate`,无需重复配置 - 启用 HTTPS 后,HTTP 请求会被自动重定向到 HTTPS(`UseHttpsRedirection`) - HTTP 和 HTTPS 可同时监听,兼容旧客户端 #### 方式二:反向代理 TLS(推荐生产环境) 使用 Nginx/Caddy 处理 TLS,Kestrel 仅监听 HTTP,**无需修改任何配置**: ``` 客户端 ──HTTPS──→ Nginx:443 ──HTTP──→ Kestrel:5000 客户端 ──MQTTS──→ Nginx:8883 ──TCP──→ Kestrel:1883 客户端 ──WSS────→ Nginx:443 ──HTTP──→ Kestrel:8083 ``` **Nginx 完整配置示例**(泛域名证书 `*.abc12.cn`): ```nginx # ============================================== # 全局配置 # ============================================== worker_processes auto; events { worker_connections 10240; } # ============================================== # 1. MQTT TCP 加密代理 (mqtts://) # ============================================== stream { upstream mqtt_tcp { server 127.0.0.1:1883; } server { listen 8883 ssl; proxy_pass mqtt_tcp; ssl_certificate /etc/nginx/ssl/abc12.cn.pem; ssl_certificate_key /etc/nginx/ssl/abc12.cn.key; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384; ssl_prefer_server_ciphers on; ssl_session_cache shared:MQTT_SSL:10m; ssl_session_timeout 1d; } } # ============================================== # 2. HTTP / HTTPS 代理 # ============================================== http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log /var/log/nginx/access.log main; error_log /var/log/nginx/error.log warn; gzip on; gzip_vary on; gzip_min_length 1000; gzip_proxied any; gzip_types text/plain text/css application/json application/javascript text/xml application/xml; map $http_upgrade $connection_upgrade { default upgrade; '' close; } upstream mqtt_ws { server 127.0.0.1:8083; } upstream iot_web { server 127.0.0.1:5000; } # ---------------------- # MQTT WSS (wss://mqtt.abc12.cn/mqtt) # ---------------------- server { listen 443 ssl http2; server_name mqtt.abc12.cn; ssl_certificate /etc/nginx/ssl/abc12.cn.pem; ssl_certificate_key /etc/nginx/ssl/abc12.cn.key; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384; ssl_prefer_server_ciphers on; ssl_session_cache shared:MQTT_WSS:10m; ssl_session_timeout 1d; location /mqtt { proxy_pass http://mqtt_ws; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 3600s; proxy_send_timeout 3600s; } } # ---------------------- # IoT 管理后台 (https://iot.abc12.cn) # ---------------------- server { listen 443 ssl http2; server_name iot.abc12.cn; ssl_certificate /etc/nginx/ssl/abc12.cn.pem; ssl_certificate_key /etc/nginx/ssl/abc12.cn.key; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384; ssl_prefer_server_ciphers on; ssl_session_cache shared:IOT_SSL:10m; ssl_session_timeout 1d; add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always; add_header X-Frame-Options DENY always; add_header X-Content-Type-Options nosniff always; location / { proxy_pass http://iot_web; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } } # ---------------------- # 80 端口强制跳转 HTTPS # ---------------------- server { listen 80; server_name iot.abc12.cn mqtt.abc12.cn; return 301 https://$host$request_uri; } } ``` **部署前检查**: | 检查项 | 命令 | |-------|------| | stream模块支持 | `nginx -V 2>&1 \| grep with-stream` | | SSL证书存在 | `ls -la /etc/nginx/ssl/abc12.cn.{pem,key}` | | 证书域名匹配 | `openssl x509 -in /etc/nginx/ssl/abc12.cn.pem -noout -text \| grep DNS` | | 配置语法检查 | `nginx -t` | **访问地址**: | 服务 | 地址 | |------|------| | IoT 管理后台 | `https://iot.abc12.cn` | | MQTT TCP TLS | `mqtts://mqtt.abc12.cn:8883` | | MQTT WebSocket TLS | `wss://mqtt.abc12.cn/mqtt` | **Windows 平台配置**: Windows 版 Nginx 需要调整以下配置: | 配置项 | Linux | Windows | |-------|-------|---------| | worker_processes | `auto` | `1`(仅支持单进程) | | 路径分隔符 | `/etc/nginx/...` | `C:/nginx/...`(使用正斜杠) | | stream 模块 | 通常已包含 | 需检查或下载完整版 | Windows 配置示例(路径调整): ```nginx worker_processes 1; # Windows 只支持单进程 stream { server { listen 8883 ssl; proxy_pass 127.0.0.1:1883; ssl_certificate C:/nginx/ssl/abc12.cn.pem; # Windows 路径 ssl_certificate_key C:/nginx/ssl/abc12.cn.key; # ... 其他 SSL 配置同 Linux } } http { # 路径使用正斜杠 access_log C:/nginx/logs/access.log main; error_log C:/nginx/logs/error.log warn; # ... 其他配置同 Linux } ``` Windows 部署检查: ```powershell # 检查 stream 模块 nginx -V 2>&1 | Select-String "stream" # 测试配置 nginx -t # 启动 nginx.exe # 注册为 Windows 服务(使用 NSSM) nssm install nginx C:\nginx\nginx.exe nssm start nginx ``` **替代方案**:如果 Windows 版 Nginx 不支持 stream 模块,建议: - 使用 Kestrel 直连 TLS(修改 appsettings.json 启用 TLS) - 使用 Caddy(自动 HTTPS,配置更简单) - 部署到 WSL2 + Docker(推荐生产环境) #### 两种方式对比 | | Kestrel 直连 TLS | 反向代理 TLS | |---|---|---| | 证书管理 | .NET 进程加载 PFX | Nginx/Caddy 管理,支持自动续期 | | 适用场景 | 小型部署、内网 | 生产环境、公网 | | 配置变更 | 修改 appsettings.json + 重启 | 仅改 Nginx 配置 + reload | | 私钥安全 | 私钥对应用可见 | 私钥仅代理层可见 | ### 数据库凭证加密 数据库凭证(用户名/密码)使用 AES-256 加密存储,加密密钥配置在 `Encryption:Key`。 **生成加密密钥**(32字节 Base64): ```csharp using System.Security.Cryptography; using(var aes = Aes.Create()) { aes.KeySize = 256; aes.GenerateKey(); Console.WriteLine(Convert.ToBase64String(aes.Key)); } ``` **加密凭证**(MySQL/SQL Server): ```csharp var credentials = "User=root;Password=your_password"; // MySQL格式 // var credentials = "User Id=sa;Password=your_password"; // SQL Server格式 var encrypted = EncryptionHelper.Encrypt(credentials, encryptionKey); ``` 将加密后的 `encrypted` 值填入 `SystemDatabase.EncryptedCredentials`。 ### 切换 MySQL ```json { "SystemDatabase": { "Provider": "MySql", "DatabaseUrl": "Server=localhost;Database=iot_platform", "EncryptedCredentials": "<加密后的凭证>" } } ``` ### 切换 SQL Server ```json { "SystemDatabase": { "Provider": "SqlServer", "DatabaseUrl": "Server=localhost;Database=iot_platform", "EncryptedCredentials": "<加密后的凭证>" } } ``` ## JSON 解析规则 ### 路径语法 | 语法 | 说明 | 示例 | | -------- | ------ | --------------- | | `field` | 固定字段 | `deviceid` | | `[]` | 遍历数组 | `[].deviceid` | | `{}` | 遍历对象属性 | `data{}.@key` | | `@key` | 提取属性名 | `data[].@key` | | `@value` | 提取属性值 | `data[].@value` | ### 默认值表达式 | 表达式 | 生成值 | 时区 | 生成时机 | | --------------------------- | --------- | ---- | ------------- | | `{guid}` | UUID | — | 解析时生成一次,所有行相同 | | `{datetime}` | 当前时间 | 本地时区 | 解析时生成一次 | | `{datetime_utc}` | 当前时间 | UTC | 解析时生成一次 | | `{datetime:yyyy-MM-dd}` | 自定义格式时间 | 本地时区 | 解析时生成一次 | | `{datetime_utc:yyyy-MM-dd}` | 自定义格式时间 | UTC | 解析时生成一次 | | `{timestamp}` | 时间戳(毫秒) | 本地时区 | 解析时生成一次 | | `{timestamp_utc}` | 时间戳(毫秒) | UTC | 解析时生成一次 | | `{date}` | 当前日期 | 本地时区 | 解析时生成一次 | | `{date_utc}` | 当前日期 | UTC | 解析时生成一次 | | `{row_guid}` | UUID | — | 每行生成,各不相同 | | `{row_datetime}` | 当前时间 | 本地时区 | 每行生成 | | `{row_datetime_utc}` | 当前时间 | UTC | 每行生成 | | `{row_timestamp}` | 时间戳(毫秒) | 本地时区 | 每行生成 | | `{row_timestamp_utc}` | 时间戳(毫秒) | UTC | 每行生成 | | `{row_date}` | 当前日期 | 本地时区 | 每行生成 | | `{row_date_utc}` | 当前日期 | UTC | 每行生成 | | `{row_snowflake}` | 雪花ID(19位)| — | 每行生成,分布式唯一 | 支持组合使用,如 `DEV_{row_guid}`、`{date}_{guid}`。 ## 数据预处理 数据预处理在**字段映射之后、入库之前**执行,对映射后的数据库列进行二次处理。处理顺序固定为:**值变换 → 范围过滤 → 聚合统计**,不可调换。 ### 处理流程 ``` 原始JSON → 字段映射 → 值变换 → 范围过滤 → 聚合统计 → 入库 ``` ### 批次概念 每条 MQTT 消息被解析后产生的所有数据行构成**一个批次**。数据预处理以批次为单位执行: - **单行数据**:一条 MQTT 消息只产生一行数据时,值变换和范围过滤正常逐行处理,聚合统计因只有一行而无实际意义 - **多行数据**:一条 MQTT 消息包含数组(如 `[{...}, {...}]`)产生多行时,聚合统计可按分组列汇总计算 ### 安全机制 | 机制 | 说明 | | ---- | -------------------------------------- | | 数据隔离 | 预处理前对原始数据做深拷贝,预处理过程不会修改原始解析结果 | | 错误容错 | 单个预处理规则执行失败时,仅跳过该规则并记录日志,不影响其他规则继续执行 | | 短路终止 | 当范围过滤触发"丢弃批次"动作时,后续所有预处理规则不再执行,整批数据不入库 | ### 值变换 对指定列的值进行数学运算或文本替换,**逐行处理**,直接修改列的值。 **数值类型运算**: | 运算 | 说明 | 操作数用途 | 示例 | | ---- | ------------------ | ----------- | ---------------------- | | 乘法 | 值 × 操作数 | 参与运算的数值 | `2220 × 0.1 = 222.0` | | 除法 | 值 ÷ 操作数(除数为0时保留原值) | 参与运算的数值 | `100 ÷ 4 = 25` | | 加法 | 值 + 操作数 | 参与运算的数值 | `10 + 5 = 15` | | 减法 | 值 - 操作数 | 参与运算的数值 | `10 - 3 = 7` | | 四舍五入 | 保留N位小数(0~15) | 保留的小数位数 | `3.1415 → 3.14`(操作数=2) | | 绝对值 | 取绝对值(操作数无效) | 不使用 | `-5 → 5` | | 值映射 | 将值映射为其他值 | 不使用,映射表单独配置 | `0→离线, 1→在线, 2→告警` | | 替换 | 替换为固定值 | 替换后的目标文本 | 任何值 → `正常` | **时间类型运算**(仅支持 `DateTime` 类型): | 运算 | 说明 | 操作数含义 | 示例 | | ---- | -------- | ------- | ------------------------------- | | 加法 | 时间 + 小时数 | 小时数 | `2026-05-01 18:00` + 8 = `2026-05-02 02:00` | | 减法 | 时间 - 小时数 | 小时数 | `2026-05-01 18:00` - 8 = `2026-05-01 10:00` | **值映射**:将列的原始值按照映射表替换为新的值,未在映射表中找到的值保留原值不变。映射键为字符串匹配,区分大小写。 **边界情况**: | 情况 | 处理方式 | | -------------------- | --------------- | | 列值为 null 或不存在 | 跳过该规则,不修改该行 | | 数值运算时列值为非数字(如 "abc") | 跳过该规则,保留原值 | | 时间运算时列值为非时间类型 | 尝试解析为数值,失败则跳过 | | 除法操作数为 0 | 保留原值,不做除法 | | 字符串形式的数字(如 "2220") | 自动解析为数值后运算 | | 运算结果为整数(如 30.0) | 自动存储为整数类型(long) | **链式处理**:同一列可配置多条规则,按顺序依次执行,前一条的输出作为后一条的输入。例如温度单位转换(华氏→摄氏)需两条规则:先减32,再除以1.8。 ### 范围过滤 对指定列的值进行范围判断,根据结果执行不同动作,**逐行处理**,仅对数值类型的列有效。 **配置项说明**: | 配置项 | 是否必须 | 说明 | |--------|----------|------| | 条件列 | 否 | 与条件值配合使用,**两者都配置才生效** | | 条件值 | 否 | 与条件列配合使用,**两者都配置才生效** | | 数据列 | 否 | 不配置则直接执行动作(纯条件过滤) | | 最小值 | 否 | 至少配置一个(最小值/最大值/过滤空值) | | 最大值 | 否 | 至少配置一个(最小值/最大值/过滤空值) | | 过滤空值 | 否 | 勾选后空值也会触发动作 | **三种使用模式**: | 模式 | 配置 | 行为 | |------|------|------| | 纯条件过滤 | 只配置条件列+条件值 | 条件匹配时直接执行动作,不检查数据值 | | 纯范围过滤 | 只配置数据列+范围 | 对所有行进行范围检查 | | 组合过滤 | 条件列+条件值+数据列+范围 | 条件匹配且范围超限时执行动作 | **范围判断逻辑**:**最小值 ≤ 列值 ≤ 最大值**,边界值本身属于合法范围。最小值和最大值可只设置其中一个。 | 动作 | 说明 | 超出范围时的行为 | | ---- | ------------ | ------------------------------------------ | | 丢弃行 | 超出范围的行不入库 | 该行被删除,其他行不受影响 | | 丢弃批次 | 整批数据都不入库 | 所有行全部丢弃,后续预处理不再执行 | | 钳位 | 超出范围的值钳位到边界值 | 小于最小值 → 设为最小值;大于最大值 → 设为最大值 | | 标记 | 保留原始值,添加标记 | 原始值不变,额外添加 `{列名}_flag` 列,值为 `out_of_range` 或 `null_or_empty` | **过滤空值**:勾选后,以下情况也会触发过滤动作: - 列值为 `null` 或不存在 - 列值为空字符串 `""` - 列值为纯空格 `" "` | 动作 | 空值行为 | | ---- | ------------------------------ | | 丢弃行 | 空值行被删除 | | 丢弃批次 | 任一行为空则整批丢弃 | | 钳位 | 空值无法钳位,跳过不处理 | | 标记 | 添加 `{列名}_flag` 列,值为 `null_or_empty` | **标记动作**:超出范围或空值的行不会被删除或修改,而是新增一个标记列(如 `temperature_flag`),值为 `out_of_range` 或 `null_or_empty`。需确保数据库表中有对应的标记列,否则写入会失败。在范围内且非空的行不会添加标记列。 **边界情况**: | 情况 | 处理方式 | | --------------- | ----------------- | | 条件列或条件值任一为空 | 视为无条件,对所有行生效 | | 数据列不配置 | 直接执行动作(纯条件过滤) | | 数据列配置但无最小值/最大值/过滤空值 | 过滤规则无效 | | 列值为 null 或不存在 | 默认跳过;勾选"过滤空值"后触发动作 | | 列值为非数字(如 "abc") | 跳过该行,不触发任何动作 | | 丢弃批次触发后 | 立即终止所有预处理,整批数据不入库 | | 同一行被多条规则匹配 | 每条规则独立判断,依次执行对应动作 | ### 聚合统计 对同一批次的数据按指定列分组后进行统计运算,**对所有行统一处理**。统计结果保持与原始数据相同的列结构,通过"标记列+标记后缀"来区分统计行。 | 函数 | 说明 | 适用数据类型 | 结果类型 | | --- | -------------- | ------ | -------- | | 平均值 | 求组内平均值(保留4位小数) | 数值 | 浮点数 | | 最大值 | 求组内最大值 | 数值 | 数值 | | 最小值 | 求组内最小值 | 数值 | 数值 | | 求和 | 求组内总和 | 数值 | 数值 | | 计数 | 统计组内非空记录数 | 任意类型 | 整数(long) | | 首个 | 取组内第一条的值 | 数值 | 数值 | | 末个 | 取组内最后一条的值 | 数值 | 数值 | **分组机制**:分组列用于将数据分成若干组,每组独立计算统计值。支持多列联合分组(用逗号分隔),如 `device_id,sensor_type` 表示按设备+类型联合分组。 **标记列与标记后缀**:统计行通过标记列和标记后缀与原始数据行区分。标记后缀追加到标记列原始值的末尾,如标记列值为"温度"、标记后缀为"(平均值)",则统计行中标记列值为"温度(平均值)"。标记列应选择文本类型的列。 **计数函数特殊性**:计数函数适用于任意数据类型(包括文本),统计源列中非 null 的记录数,结果为整数。其他函数仅适用于数值类型。 **输出模式**: | 模式 | 说明 | 适用场景 | | --- | -------------- | -------------------- | | 追加 | 保留原始行,统计行追加在后面 | 需要同时保留原始数据和统计结果(最常用) | | 仅统计 | 丢弃原始行,只保留统计行 | 只需要汇总结果,不需要明细数据 | **多聚合列**:同一聚合规则中可配置多个聚合列。引用同一源列时,后配置的会覆盖先配置的结果值;引用不同源列时不会互相覆盖。 **边界情况**: | 情况 | 处理方式 | | --------------- | -------------------- | | 分组列值全部相同 | 所有行归为一组,生成1条统计行 | | 每行分组列值都不同 | 每组只有1条,统计值等于原值 | | 源列值全为 null 或非数值 | 统计结果为 null(计数函数返回 0) | | 只有1行数据 | 统计值等于原值(平均值=原值,计数=1) | | 标记列值为 null | 标记列结果仅为标记后缀本身 | | 未设置标记列或标记后缀 | 统计行与原始行无法区分 | ## 数据流 ``` ┌─ MQTT 客户端 → MQTT Broker → MqttMessageHandler(持久化) ─┐ │ │ ├─ 采集任务 → CollectionEngine → 驱动读取 → 批量缓冲(持久化) ─┤ │ ├──→ IngestionProcessingQueue └─ HTTP POST /api/data/push → WebApiRecordService(持久化) ──┘ ↓ ParsingEngine 匹配规则 ↓ MessageParser 解析 JSON ↓ 字段映射(JSON路径 → 数据库列) ↓ 数据预处理(值变换 → 告警检测 → 范围过滤 → 聚合统计) ↓ DynamicTableCreator 建表 ↓ 数据写入目标数据库 告警流:AlertPreprocessor → AlertChannel → AlertDispatcher → 系统规则匹配(冷却/速率限制) → 写入AlertRecord → 分发通知渠道 ``` ## 性能参考 以下为 MQTT + 数采同时启用时的最大设备支持能力估算,实际性能受硬件配置、网络环境、数据点位密度、采集间隔等因素影响。 ### SQLite 默认配置(4核 / 8GB 内存) | 维度 | 上限 | 瓶颈原因 | |------|------|----------| | DAQ 轮询设备 | 50-100 台 | 每设备每秒 4-6 次 DB 操作,SQLite 写入极限约 500 次/s | | DAQ 订阅设备 | 100-200 台 | DB 压力较小,但订阅回调全局锁串行化 | | MQTT 连接数 | 200-500 | 连接验证需 DB 查询 + 写入 | | MQTT 消息吞吐 | 100-300 msg/s | 每条消息 2 次 DB 写入 | | 总数据点位 | 500-2000 | Modbus 逐点读取,网络 RTT 限制 | | MQTT + DAQ 同时 | 30-50 台 DAQ + 100 MQTT 连接 | SQLite 单线程写入竞争 | ### MySQL / PostgreSQL(8核 / 16GB 内存) | 维度 | 上限 | 瓶颈原因 | |------|------|----------| | DAQ 轮询设备 | 200-500 台 | DB 不再是瓶颈,驱动逐点读取成为瓶颈 | | DAQ 订阅设备 | 500-1000 台 | 依赖 OPC UA Server 订阅能力 | | MQTT 连接数 | 1000-3000 | DB 连接池 + 索引优化 | | MQTT 消息吞吐 | 1000-3000 msg/s | 批量写入 + 连接池 | | MQTT + DAQ 同时 | 200-300 台 DAQ + 500 MQTT 连接 | Modbus 逐点读取是瓶颈 | ### 核心瓶颈 | 瓶颈 | 说明 | |------|------| | SQLite 单线程写入 | 所有模块(MQTT 消息入库、DAQ 状态更新、解析数据持久化)争抢同一把写锁 | | Repository 逐条 SaveChanges | 每条记录一次 DB 往返,N 条记录 = N 次往返 | | DAQ 每次采集 4-6 次 DB 操作 | 解析 + 更新任务状态 + 查询规则 + 保存采集记录 | | Modbus 逐点串行读取 | 10 个点位 = 10 次 TCP 请求,未合并连续地址 | | 外部数据库无连接池 | 解析引擎每次持久化新建 DbConnection | ### 优化方向 | 优先级 | 优化项 | 预期提升 | |--------|--------|----------| | P0 | DAQ 状态更新降频(每分钟或每 N 次采集) | DB 操作减少 40-50% | | P0 | 批量 DB 写入(攒批后一次 SaveChanges) | DB 往返减少 5-10 倍 | | P0 | 外部数据库连接池 | 持久化延迟降低 80% | | P0 | 生产环境切换 MySQL / PostgreSQL | 写入并发提升 10 倍+ | | P1 | Modbus 合并连续地址批量读取 | 单设备采集耗时降低 5-10 倍 | | P1 | MQTT 客户端连接验证缓存 | 连接吞吐提升 5-10 倍 | | P1 | 订阅回调分片锁 | 订阅设备并行化 | | P2 | 解析规则增量刷新 | 减少全表加载开销 | | P2 | Repository 只读查询加 AsNoTracking | 内存占用降低 30-50% | ## 日志体系 ### 日志级别与呈现规则 | 日志级别 | 控制台 | 文件 | 预警触发条件 | 预警类型 | severity | |----------|:------:|:----:|-------------|----------|----------| | **Trace** | - | - | 不触发 | - | - | | **Debug** | - | ✓ | `PushToNotice` → 平台通知 / `PushToAlert` → 平台预警 | 取决于包装方式 | notice / critical | | **Information** | ✓ | ✓ | `PushToNotice` → 平台通知 / `PushToAlert` → 平台预警 | 取决于包装方式 | notice / critical | | **Error** | ✓ | ✓ | **自动触发**(无需 PushToNotice/PushToAlert) | **平台预警** | critical | | **Fatal** | ✓ | ✓ | **自动触发** | **平台预警** | critical | **预警判定逻辑**: - `Level >= Error` → 自动触发平台预警(severity=critical) - `PushToNotice` 包裹 → 触发平台通知(severity=notice) - `PushToAlert` 包裹 → 触发平台预警(severity=critical) - severity 由包装方式决定,不再根据 Exception 推断 - 项目中无手动调用 `LogError`/`LogFatal`,Error 级别仅来自 ASP.NET Core 框架自动捕获的未处理异常 **预警分发路径**:Serilog 日志 → AlertSink → AlertChannel → AlertDispatcher → 系统规则匹配 → 冷却检查 → 写入告警记录 → 分发通知渠道(钉钉/邮件/Webhook/企业微信) ### 日志详细清单 日志配置:控制台最低级别 **Information**,文件最低级别 **Debug**,平台通知/预警需配置通知通道才发送。 | # | 文件 | 内容 | 级别 | AlertPush | 控制台 | 文件 | 平台通知 | 平台预警 | |---|------|------|------|-----------|:------:|:----:|:--------:|:--------:| | 1 | ParsingEngine | 消息无ClientId,跳过解析 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 2 | ParsingEngine | ClientId未绑定网关规则,跳过解析 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 3 | ParsingEngine | 解析失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 4 | ParsingEngine | 数据持久化跳过 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 5 | ParsingEngine | 持久化行数 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 6 | MessageParser | 字段未找到使用默认值 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 7 | MessageParser | JSON解析错误 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 8 | MessageParser | 消息解析失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 9 | ValueTransformPreprocessor | 列转换失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 10 | MqttMessageHandler | MQTT消息接收详情 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 11 | MqttIntegrationService | 解析完成 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 12 | MqttIntegrationService | 消息解析失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 13 | DataQueryService | 查询SQL/结果详情 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 14 | RangeFilterPreprocessor | 批量跳过(范围过滤) | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 15 | PreprocessPipeline | 预处理器执行失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 16 | PreprocessPipeline | 预处理管道被停止 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 17 | AlertDispatcher | 未找到系统规则/已禁用/冷却中/已处理 | Trace | ❌ | ❌ | ❌ | ❌ | ❌ | | 18 | MqttAuthHandler | MQTT认证失败(缺凭证/用户不存在/已禁用/密码错误) | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 19 | MqttAuthHandler | MQTT客户端认证成功 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 20 | MqttMessageHandler | Payload超长截断 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 21 | MqttService | 网关上线 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 22 | MqttService | 网关离线 | Debug | PushToAlert | ❌ | ✅ | ❌ | ✅ | | 23 | MqttService | 客户端订阅/取消订阅 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 24 | MqttIntegrationService | 未注册网关连接 | Debug | PushToAlert | ❌ | ✅ | ❌ | ✅ | | 25 | AlertPreprocessor | 告警触发 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 26 | OpcUaDriver | OPC UA连接/断开/订阅/读取/写入/释放 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 27 | ModbusTcpDriver | Modbus TCP连接/断开/读取/写入 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 28 | ModbusRtuDriver | Modbus RTU连接/断开/读取/写入 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 29 | MqttService | MQTT Broker启动(TCP/WS/TLS) | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 30 | MqttService | MQTT服务已关闭 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 31 | MqttService | MQTT Broker重启 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 32 | MqttService | 强制断开客户端 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 33 | GatewayService | 网关已创建/已更新/已删除 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 34 | AuthService | 密码修改/管理员CRUD/启禁 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 35 | DynamicTableCreator | 动态表创建成功 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 36 | CollectionEngine | 引擎启动/采集任务启动/停止 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 37 | IngestionProcessingQueue | 消息处理队列已启动 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 38 | AlertChannel | AlertChannel已启动/已停止 | Info | ❌ | ✅ | ✅ | ❌ | ❌ | | 39 | MqttService | Broker启动/停止失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 40 | MqttService | 断开客户端/更新状态/事件处理失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 41 | MqttService | 所有MQTT传输通道均已关闭 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 42 | MqttService | TLS已启用但未配置证书 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 43 | MqttIntegrationService | 事件处理器/推送通知/入队/标记状态失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 44 | MqttIntegrationService | 消息处理队列已满 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 45 | IngestionProcessingQueue | 队列已满/处理失败/更新状态失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 46 | ParsingEngine | 刷新规则/加载规则/确保表存在/数据库配置失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 47 | MessageParser | 冲突路径/循环类型 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 48 | AlertPreprocessor | 预警规则执行失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 49 | AlertDispatcher | 处理预警/速率限制/发送/写入/更新计数失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 50 | AlertChannel | 告警通道已满 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 51 | AuthService | 用户验证/修改密码失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 52 | DatabaseConfigService | 连接测试/解密凭证失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 53 | DynamicTableCreator | 创建/检查表失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 54 | CollectionEngine | 采集任务/重连/批量刷新/持久化/状态更新失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 55 | DeviceService | 恢复字段/反序列化配置失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 56 | DataExpressionEvaluator | 表达式超长/计算失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 57 | AuditLogService | 审计日志写入失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 58 | MqttServerHostedService | MQTT Server启动/停止失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 59 | MqttMessageHandler | MQTT消息处理失败 | Debug | ❌ | ❌ | ✅ | ❌ | ❌ | | 60 | ASP.NET Core框架 | 未处理异常(如UriFormatException) | Error | ❌ | ✅ | ✅ | ❌ | ✅ | **说明**: - 第22、24行:`PushToAlert` 使 Debug 级别日志触发平台预警(severity=critical) - `PushToNotice` 使 Debug/Info 级别日志触发平台通知(severity=notice) - 项目中无手动调用 `LogError`/`LogFatal`,Error 级别日志仅来自 ASP.NET Core 框架自动捕获的未处理异常 - 所有捕获异常日志均为 Debug 级别(规则:链路失败+捕获异常→Debug) - 消息处理链路成功日志为 Trace 级别(规则:链路成功→Trace) - 平台通知/平台预警的"发送通知"取决于系统规则是否配置了通知通道 ## 默认账户 | 类型 | 用户名 | 密码 | | ---- | -------- | --------- | | 管理员 | admin | admin@123 | | MQTT | test | 123 | ## License MIT