Ai
1 Star 0 Fork 0

陈培俊/sky_python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mqtt_svr.py 5.94 KB
一键复制 编辑 原始数据 按行查看 历史
ChenPeijun 提交于 2025-07-05 19:45 +08:00 . add const var py
import paho.mqtt.client as mqtt
import threading
import queue
import time
import json
from postgres import *
from retcode import *
from const_var import *
message_queue = queue.Queue()
topic_dev_tele_req = "v1/dev/tele/req/+"
topic_dev_attr_req = "v1/dev/attr/req/+"
topic_dev_attr_ack = "v1/dev/attr/ack/"
topic_dev_id_req = "v1/dev/id/req"
topic_dev_id_ack = "v1/dev/id/ack"
topic_svr_alarm = "v1/svr/alarm"
def on_connect(client, userdata, flags, rc, props):
print(f"Connected with result code {rc}")
if rc == 0:
print("Successfully connected to the MQTT broker.")
client.subscribe(topic_dev_tele_req)
client.subscribe(topic_dev_attr_req)
client.subscribe(topic_dev_id_req)
else:
print("Failed to connect to the MQTT broker with result code {rc}.")
def on_subscribe(client, userdata, mid, granted_qos, properties):
print(f"Subscribed {mid} to topics with QoS: {granted_qos}")
def on_message(client, userdata, msg):
print(f"Received message on topic '{msg.topic}'")
topic_dev_attr_req_prefix = topic_dev_attr_req[:-1]
topic_dev_tele_req_prefix = topic_dev_tele_req[:-1]
if msg.topic.startswith(topic_dev_tele_req_prefix):
dev_id = msg.topic.split('/')[-1]
print("dev_id:", dev_id)
message_queue.put((dev_id, msg.payload)) # 直接放入队列,不解析JSON
alarm_json = process_alarm(dev_id, msg.payload)
if alarm_json is not None:
print("alarm trigger, alarm_json:", alarm_json)
client.publish(topic_svr_alarm, payload=json.dumps(alarm_json))
elif msg.topic.startswith(topic_dev_attr_req_prefix):
dev_id = msg.topic.split('/')[-1]
print("dev_id:", dev_id)
ret_json = process_attr(dev_id, msg.payload)
print(ret_json)
client.publish(topic_dev_attr_ack + dev_id, payload=json.dumps(ret_json))
elif msg.topic == topic_dev_id_req:
ret_id = process_id(msg.payload)
if ret_id < 0:
json_message = {"id": ret_id, "errMsg": get_error_message(ret_id)}
else:
json_message = {"id": ret_id, "errMsg": get_error_message(0)}
client.publish(topic_dev_id_ack, payload=json.dumps(json_message))
else:
print("Unknown topic:", msg.topic)
def process_alarm(dev_id, payload):
json_message = json.loads(payload)
for key, value in json_message.items():
if key == 'file':
return {'alarm_type':'info', 'id':dev_id, 'file': value}
return None
def process_id(payload):
connection = connect_to_db()
if connection is None:
print("Failed to connect to the database.")
return DATABASE_ERROR
ret_id = INVALID_JSON_ERROR
try:# 解析JSON消息
json_message = json.loads(payload)
dev_id = json_message['dev_id']
ret_id = get_id_by_dev_id(connection, "devices", dev_id)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except KeyError as e:
print(f"Error accessing JSON keys: {e}")
connection.close()
return ret_id
def determine_value_type(value):
if isinstance(value, int):
return 0, value, ''
elif isinstance(value, float):
return 1, value, ''
else:
return 2, 0.0, value
def process_attr(dev_id, payload):
connection = connect_to_db()
if connection is None:
print("Failed to connect to the database.")
return {"errMsg": "Database error."}
ret = SUCCESS
try:# 解析JSON消息
json_message = json.loads(payload)
attr_table = "attr_" + dev_id
for key, value in json_message.items():
# Process the message and insert into the database
print("inserting record into attr:", key, value)
value_type, value_r, value_t = determine_value_type(value)
insert_attr_record(connection, attr_table, key, value_type, value_r, value_t)
records = fetch_server_attr_records(connection, attr_table)
if records is None:
print("No records found.")
ret = RECORD_NOT_EXIST
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
ret = INVALID_JSON_ERROR
connection.close()
if ret == SUCCESS:
return records
else:
return {"errMsg": get_error_message(ret)}
def process_tele(dev_id, payload):
connection = connect_to_db()
if connection is None:
print("Failed to connect to the database.")
return
try:
# 解析JSON消息
json_message = json.loads(payload)
tele_table = f"tele_{dev_id}"
ts = datetime.datetime.now().isoformat()
for key, value in json_message.items():
# Process the message and insert into the database
print("Inserting record into database:", key, value)
if key.startswith("#"):
continue
value_type, value_r, value_t = determine_value_type(value)
insert_telemetry_record(connection, tele_table, key, ts, value_type, value_r, value_t)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
connection.close()
def process_messages():
while True:
try:
msg = message_queue.get(timeout=1)
dingding_msg = process_tele(msg[0], msg[1])
message_queue.task_done()
except queue.Empty:
continue
def main():
# 创建MQTT客户端
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set(mqtt_username, mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
# 连接到MQTT代理
client.connect(mqtt_server_ip, mqtt_server_port, 60)
# 启动消息处理线程
message_thread = threading.Thread(target=process_messages, daemon=True)
message_thread.start()
# 主线程执行client.loop_forever()
client.loop_forever()
if __name__ == "__main__":
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pei-jun-chen/sky_python.git
git@gitee.com:pei-jun-chen/sky_python.git
pei-jun-chen
sky_python
sky_python
master

搜索帮助