Ai
1 Star 0 Fork 0

陈培俊/sky_python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
dtalk_robot.py 5.78 KB
一键复制 编辑 原始数据 按行查看 历史
ChenPeijun 提交于 2025-07-05 19:45 +08:00 . add const var py
import datetime
import json
import threading
import time
import hmac
import hashlib
import base64
import urllib.parse
import requests
import paho.mqtt.client as mqtt
from const_var import *
from postgres import *
topic_svr_alarm = "v1/svr/alarm"
def on_connect(client, userdata, flags, rc, props):
print(f"Connected with result code {rc}")
if rc == 0:
client.subscribe(topic_svr_alarm)
else:
print("Failed to connect to the MQTT broker.")
def on_subscribe(client, userdata, mid, granted_qos, properties):
print(f"Subscribed {mid} to topics with QoS: {granted_qos}")
def on_message(client, userdata, msg):
global robot
print(f"Received message on topic '{msg.topic}'")
if msg.topic == topic_svr_alarm:
json_msg = json.loads(msg.payload)
table_id = json_msg['id']
filename = http_server_url + 'cam/' + json_msg['file']
connection = connect_to_db()
if connection is None:
print("Failed to connect to the database.")
return
dev_id, dev_name, dev_location = fetch_devices_by_id(connection, "devices", table_id)
connection.close()
title = user_name + "的设备状态报告"
text = "![]\n\n"
text = "### " + dev_name + "( "+ dev_location + " )\n"
text += "报告时间: " + now.strftime('%Y-%m-%d %H:%M') + "\n\n"
text += "![](" + filename + ")\n\n"
print(text)
response = robot.send(title, text)
print(response)
def mqtt_main():
# 创建MQTT客户端
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(mqtt_username, mqtt_password)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
# 连接到MQTT代理
mqtt_client.connect(mqtt_server_ip, mqtt_server_port, 60)
mqtt_client.loop_start()
class DingTalkRobot:
def __init__(self, webhook_url, secret):
self.webhook_url = webhook_url
self.secret = secret
def get_sign(self, timestamp):
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def send(self, title, text):
timestamp = str(round(time.time() * 1000))
sign = self.get_sign(timestamp)
url = f'{self.webhook_url}&timestamp={timestamp}&sign={sign}'
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
}
}
response = requests.post(url, headers=headers, json=data)
return response.json()
if __name__ == "__main__":
# 启动mqtt处理线程
message_thread = threading.Thread(target=mqtt_main, daemon=True)
message_thread.start()
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=4a36ee437c2a07aa5d57d0599b08ebb2a6972360a1710bb673450d4cc203e03c'
secret = 'SEC3e3ff4137f684212725a689870a989859aefec7c67d83815b8d79125ba34378a'
robot = DingTalkRobot(webhook_url, secret)
send_schedule = [(8,0), (14,0), (20,0)]
msg_seg = [('light', '亮度', 'Lux'), ('temp', '温度', '℃'), ('humi', '湿度', '%'), ('Vbat', '电池电压', 'mV')]
user_name = "chenpj"
while True:
now = datetime.datetime.now()
current_time = (now.hour, now.minute)
# print(current_time)
if current_time in send_schedule:
connection = connect_to_db()
if connection is None:
print("Failed to connect to the database.")
continue
devices = fetch_devices_by_user(connection, "devices", user_name)
table_ids = []
for device in devices:
table_ids.append(device['id'])
print(table_ids)
title = user_name + "的设备状态报告"
text = "![](' + http_server_url + 'sky.jpg)\n\n"
text += "### 报告时间: " + now.strftime('%Y-%m-%d %H:%M') + "\n"
for table_id in table_ids:
dev_id, dev_name, dev_location = fetch_devices_by_id(connection, "devices", table_id)
table_name = "tele_" + str(table_id)
text += "#### [" + dev_name + "( "+ dev_location + " )](' + http_server_url + 'details.html?id=" + str(table_id) + "&hours=24)\n"
latest_time = get_latest_tele_timestamp(connection, table_name)
if latest_time is None:
text += f"{dev_name} 没有数据.\n"
continue
print(f"latest_time = {latest_time}")
seconds_ago = datetime.datetime.now(datetime.timezone.utc) - latest_time
if seconds_ago.total_seconds() < 60 * 60: # 如果距离上一次记录的时间小于60分钟,则发送消息
# 获取统计数据
for seg in msg_seg:
avg, max, min = get_stat_value_in_hours(connection, table_name, seg[0], 24)
# print(f"avg,max,min = {avg},{max},{min}")
text += f"- {seg[1]}: {min:.0f} ~ {max:.0f}{seg[2]} (平均值:{avg:.1f}{seg[2]}).\n"
min = seconds_ago.total_seconds() / 60
text += f"数据更新于 {min:.0f} 分钟前.\n"
else:
hour = seconds_ago.total_seconds() / 3600
text += f"{dev_name} 可能离线 {hour:.1f} 小时了.\n"
print(text)
response = robot.send(title, text)
print(response)
connection.close()
# 等待一分钟后再检查
time.sleep(60)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pei-jun-chen/sky_python.git
git@gitee.com:pei-jun-chen/sky_python.git
pei-jun-chen
sky_python
sky_python
master

搜索帮助