代码拉取完成,页面将自动刷新
import datetime
import psycopg2
from psycopg2 import sql
from retcode import *
def connect_to_db():
"""Connect to the PostgreSQL database."""
try:
connection = psycopg2.connect(
dbname="skyiot",
user="postgres", # 替换为你的用户名
password="Chpj2019", # 替换为你的密码
host="localhost", # 替换为你的数据库主机地址
port="5432" # 替换为你的数据库端口
)
return connection
except Exception as e:
print(f"Error connecting to the database: {e}")
return None
def create_tele_table(connection, table_name):
"""Check if the specified table exists, and create it if it doesn't.
Also, insert a default record into the table."""
try:
with connection.cursor() as cursor:
# Create the table
cursor.execute(sql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL PRIMARY KEY,
name character varying(255) NOT NULL,
value_type integer NOT NULL,
value_r real DEFAULT 0.0,
value_t character varying(255) DEFAULT ''::character varying,
ts timestamp(0) with time zone
);
""").format(sql.Identifier(table_name)))
connection.commit()
print(f"Table '{table_name}' created successfully.")
except Exception as e:
print(f"Error checking or creating the table: {e}")
connection.rollback()
def create_attr_table(connection, table_name):
"""Check if the specified table exists, and create it if it doesn't.
Also, insert a default record into the table."""
try:
with connection.cursor() as cursor:
# Create the table
cursor.execute(sql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL PRIMARY KEY,
name character varying(255) NOT NULL,
value_type integer NOT NULL DEFAULT 0,
value_r real DEFAULT 0.0,
value_t character varying(255) DEFAULT ''::character varying,
update timestamp(0) with time zone DEFAULT CURRENT_TIMESTAMP
);
""").format(sql.Identifier(table_name)))
connection.commit()
print(f"Table '{table_name}' created successfully.")
except Exception as e:
print(f"Error checking or creating the table: {e}")
connection.rollback()
def delete_table(connection, table_name):
"""Delete the specified table."""
try:
with connection.cursor() as cursor:
# Delete the table
cursor.execute(sql.SQL("""
DROP TABLE IF EXISTS {};
""").format(sql.Identifier(table_name)))
connection.commit()
print(f"Table '{table_name}' deleted successfully.")
except Exception as e:
print(f"Error deleting the table: {e}")
connection.rollback()
def get_id_by_dev_id(connection, table_name, dev_id):
"""Query the specified table for the id based on the dev_id.
Returns the id if found, otherwise returns -1."""
print("dev_id:", dev_id)
try:
with connection.cursor() as cursor:
cursor.execute(sql.SQL("""
SELECT id
FROM {}
WHERE dev_id = %s;
""").format(sql.Identifier(table_name)), [dev_id])
result = cursor.fetchone()
if result:
return result[0]
else:
return DEV_NOT_EXIST
except Exception as e:
print(f"Error querying the table: {e}")
return DATABASE_ERROR
def insert_telemetry_record(connection, table_name, name, ts, value_type, value_r=0.0, value_t=''):
"""Insert a record into the specified table."""
try:
with connection.cursor() as cursor:
# Construct the SQL query with optional parameters
query = sql.SQL("""
INSERT INTO {} (name, ts, value_type, value_r, value_t)
VALUES (%s, %s, %s, %s, %s);
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
# ts = datetime.datetime.now().isoformat()
cursor.execute(query, (name, ts, value_type, value_r, value_t))
connection.commit()
# print(f"Record inserted into table '{table_name}' successfully.")
except Exception as e:
print(f"Error inserting record into the table: {e}")
connection.rollback()
def get_latest_tele_timestamp(connection, table_name):
"""Get the latest timestamp for the specified name from the table."""
try:
with connection.cursor() as cursor:
# Construct the SQL query to get the latest timestamp
query = sql.SQL("""
SELECT ts
FROM {}
ORDER BY id DESC;
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
cursor.execute(query)
# Fetch the result
result = cursor.fetchone()
if result:
return result[0]
else:
return None
except Exception as e:
print(f"Error getting latest timestamp from the table: {e}")
return None
def get_tele_values_by_timestamp(connection, table_name, names, ts):
"""Get the value for the specified names and timestamp from the table."""
try:
with connection.cursor() as cursor:
results = {}
for name in names:
# Construct the SQL query to get the value
query = sql.SQL("""
SELECT value_r
FROM {}
WHERE name = %s AND ts = %s;
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
cursor.execute(query, (name, ts))
# Fetch the result
result = cursor.fetchone()
results[name] = result[0] if result else None
return results
except Exception as e:
print(f"Error getting value by timestamp from the table: {e}")
return {}
def get_tele_values_in_hours(connection, table_name, names, hours):
"""Get the average value and the latest value of the specified name from the table for the last hour."""
try:
with connection.cursor() as cursor:
# Calculate the timestamp for one hour ago
hours_ago = datetime.datetime.now() - datetime.timedelta(hours=hours)
hours_ago_iso = hours_ago.isoformat()
value = []
ts = []
for name in names:
if hours < 24:
# Construct the SQL query to get the average value and the latest value
query = sql.SQL("""
SELECT value_r, ts
FROM {}
WHERE name = %s AND ts >= %s;
""").format(sql.Identifier(table_name))
else:
# Construct the SQL query to get the average value per hour
query = sql.SQL("""
SELECT AVG(value_r) AS avg_value_r, DATE_TRUNC('hour', ts) AS hour
FROM {}
WHERE name = %s AND ts >= %s
GROUP BY DATE_TRUNC('hour', ts)
ORDER BY hour;
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
cursor.execute(query, (name, hours_ago_iso))
# Fetch the result
result = cursor.fetchall()
if result:
v_arr, t_arr = [], []
for row in result:
v_arr.append((int)(row[0]))
t_arr.append((int)(row[1].timestamp()))
value.append(v_arr)
ts.append(t_arr)
# print(results[name])
else:
value.append([])
ts.append([])
# check
ts_len = len(ts[0])
for t in ts:
if len(t) != ts_len:
print("ts length not equal")
return {}
if len(value) != len(names):
print("value length not equal")
return {}
results = {'ts': ts[0]}
for v,name in zip(value, names):
if len(v) == ts_len:
results[name] = v
return results
except Exception as e:
print(f"Error getting value from the table: {e}")
return {}
def get_vbat_in_days(connection, table_name, days):
try:
with connection.cursor() as cursor:
# Calculate the timestamp for one hour ago
days_ago = datetime.datetime.now() - datetime.timedelta(days=days)
days_ago_iso = days_ago.isoformat()
# Construct the SQL query to get the average value per hour
query = sql.SQL("""
SELECT AVG(value_r) AS avg_value_r, DATE_TRUNC('day', ts) AS day
FROM {}
WHERE name = %s AND ts >= %s
GROUP BY DATE_TRUNC('day', ts)
ORDER BY day;
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
cursor.execute(query, ('Vbat', days_ago_iso))
# Fetch the result
result = cursor.fetchall()
if result:
print("resutl len:", len(result))
v_arr, t_arr = [], []
for row in result:
v_arr.append((int)(row[0]))
t_arr.append((int)(row[1].timestamp()))
return {'ts': t_arr, 'vbat': v_arr}
else:
return {}
except Exception as e:
print(f"Error getting value from the table: {e}")
return {}
def get_stat_value_in_hours(connection, table_name, name, hours):
"""Get the average value, maximum value, and minimum value of the specified name from the table for the last hour."""
try:
with connection.cursor() as cursor:
# Calculate the timestamp for the specified number of hours ago
hours_ago = datetime.datetime.now() - datetime.timedelta(hours=hours)
hours_ago_iso = hours_ago.isoformat()
# Construct the SQL query to get the average, max, and min values
query = sql.SQL("""
SELECT AVG(value_r) AS avg_value, MAX(value_r) AS max_value, MIN(value_r) AS min_value
FROM {}
WHERE name = %s AND ts >= %s;
""").format(sql.Identifier(table_name))
# Execute the query with the provided values
cursor.execute(query, (name, hours_ago_iso))
# Fetch the result
result = cursor.fetchone()
if result:
avg_value = result[0] if result[0] is not None else 0.0
max_value = result[1] if result[1] is not None else 0.0
min_value = result[2] if result[2] is not None else 0.0
return avg_value, max_value, min_value
else:
return 0.0, 0.0, 0.0
except Exception as e:
print(f"Error getting average, max, and min value from the table: {e}")
return None, None, None
def insert_attr_record(connection, table_name, name, value_type, value_r=0.0, value_t=''):
"""Insert a record into the specified table or update if it exists."""
try:
with connection.cursor() as cursor:
# Construct the SQL query for updating the record
update_query = sql.SQL("""
UPDATE {}
SET value_type = %s, value_r = %s, value_t = %s
WHERE name = %s;
""").format(sql.Identifier(table_name))
# Execute the update query with the provided values
cursor.execute(update_query, (value_type, value_r, value_t, name))
# Check if the update was successful
if cursor.rowcount == 0:
# If no rows were updated, insert a new record
insert_query = sql.SQL("""
INSERT INTO {} (name, value_type, value_r, value_t)
VALUES (%s, %s, %s, %s);
""").format(sql.Identifier(table_name))
# Execute the insert query with the provided values
cursor.execute(insert_query, (name, value_type, value_r, value_t))
connection.commit()
return SUCCESS
# print(f"Record inserted or updated in table '{table_name}' successfully.")
except Exception as e:
print(f"Error inserting or updating record in the table: {e}")
connection.rollback()
return DATABASE_ERROR
def delete_attr_record(connection, table_name, name):
"""Delete a record from the specified table based on the name."""
try:
with connection.cursor() as cursor:
# Construct the SQL query for deleting the record
delete_query = sql.SQL("""
DELETE FROM {}
WHERE name = %s;
""").format(sql.Identifier(table_name))
# Execute the delete query with the provided name
cursor.execute(delete_query, [name])
# Check if the delete was successful
if cursor.rowcount > 0:
connection.commit()
return SUCCESS
else:
connection.rollback()
return RECORD_NOT_EXIST
except Exception as e:
print(f"Error deleting record from the table: {e}")
connection.rollback()
return DATABASE_ERROR
def fetch_server_attr_records(connection, table_name):
"""Fetch all records from the specified table and return them as a JSON list."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name))
# Execute the query
cursor.execute(query)
# Fetch all records
records = cursor.fetchall()
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Convert records to a list of dictionaries
records_dict = {}
for record in records:
record_pair = dict(zip(column_names, record))
if record_pair['value_type'] == 3 or record_pair['value_type'] == 0:
records_dict[record_pair['name']] = int(record_pair['value_r'])
elif record_pair['value_type'] == 4 or record_pair['value_type'] == 1:
records_dict[record_pair['name']] = float(record_pair['value_r'])
elif record_pair['value_type'] == 5 or record_pair['value_type'] == 2:
records_dict[record_pair['name']] = record_pair['value_t']
return records_dict
except Exception as e:
print(f"Error fetching records from the table: {e}")
return None, None
def fetch_all_attr_records(connection, table_name):
"""Fetch all records from the specified table and return them as a JSON list."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name))
# Execute the query
cursor.execute(query)
# Fetch all records
records = cursor.fetchall()
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Convert records to a list of dictionaries
server_records_dict = {}
device_records_dict = {}
for record in records:
record_pair = dict(zip(column_names, record))
match record_pair['value_type']:
case 0:
device_records_dict[record_pair['name']] = int(record_pair['value_r'])
case 1:
device_records_dict[record_pair['name']] = float(record_pair['value_r'])
case 2:
device_records_dict[record_pair['name']] = record_pair['value_t']
case 3:
server_records_dict[record_pair['name']] = int(record_pair['value_r'])
case 4:
server_records_dict[record_pair['name']] = float(record_pair['value_r'])
case 5:
server_records_dict[record_pair['name']] = record_pair['value_t']
case _:
print(f"Unknown value type: {record_pair['value_type']}")
return device_records_dict, server_records_dict
except Exception as e:
print(f"Error fetching records from the table: {e}")
return None, None
def del_attr_records(connection, table_name, ):
"""Fetch all records from the specified table and return them as a JSON list."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name))
# Execute the query
cursor.execute(query)
# Fetch all records
records = cursor.fetchall()
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Convert records to a list of dictionaries
server_records_dict = {}
device_records_dict = {}
for record in records:
record_pair = dict(zip(column_names, record))
match record_pair['value_type']:
case 0:
device_records_dict[record_pair['name']] = int(record_pair['value_r'])
case 1:
device_records_dict[record_pair['name']] = float(record_pair['value_r'])
case 2:
device_records_dict[record_pair['name']] = record_pair['value_t']
case 3:
server_records_dict[record_pair['name']] = int(record_pair['value_r'])
case 4:
server_records_dict[record_pair['name']] = float(record_pair['value_r'])
case 5:
server_records_dict[record_pair['name']] = record_pair['value_t']
case _:
print(f"Unknown value type: {record_pair['value_type']}")
return device_records_dict, server_records_dict
except Exception as e:
print(f"Error fetching records from the table: {e}")
return None, None
def check_and_update_user_login(connection, table_name, name, passwd):
"""Check if the specified name and passwd exist in the specified table.
If they exist, update the login_time to the current time and return the record id.
If they do not exist, return an error code."""
try:
with connection.cursor() as cursor:
# Check if the user exists
cursor.execute(sql.SQL("""
SELECT passwd
FROM {}
WHERE name = %s;
""").format(sql.Identifier(table_name)), [name])
result = cursor.fetchone()
if result:
stored_passwd = result[0]
if stored_passwd == passwd:
# Update the login_time to the current time
current_time = datetime.datetime.now().isoformat()
cursor.execute(sql.SQL("""
UPDATE {}
SET login_time = %s
WHERE name = %s;
""").format(sql.Identifier(table_name)), (current_time, name))
connection.commit()
return SUCCESS
else:
return PASSWORD_MISMATCH
else:
return RECORD_NOT_EXIST
except Exception as e:
print(f"Error checking or updating user login: {e}")
return DATABASE_ERROR
def insert_user_record(connection, table_name, name, passwd, email='', phone=''):
"""Insert a new user record into the specified table with the given details."""
try:
with connection.cursor() as cursor:
# Get the current time
current_time = datetime.datetime.now().isoformat()
# Construct the SQL query for inserting a new record
query = sql.SQL("""
INSERT INTO {} (name, passwd, email, phone, login_time, create_time)
VALUES (%s, %s, %s, %s, %s, %s);
""").format(sql.Identifier(table_name))
# Execute the insert query with the provided values
cursor.execute(query, (name, passwd, email, phone, current_time, current_time))
connection.commit()
print(f"User record inserted into table '{table_name}' successfully.")
return SUCCESS
except Exception as e:
print(f"Error inserting user record into the table: {e}")
connection.rollback()
return DATABASE_ERROR
def user_change_password(connection, table_name, name, passwd, new_passwd):
try:
with connection.cursor() as cursor:
# Update the record with the new password
query = sql.SQL("""
UPDATE {}
SET passwd = %s
WHERE name = %s AND passwd = %s;
""").format(sql.Identifier(table_name))
# Execute the update query with the provided values
cursor.execute(query, (new_passwd, name, passwd))
# Check if the update was successful
if cursor.rowcount > 0:
connection.commit()
print(f"Password for user '{name}' updated successfully.")
return SUCCESS
else:
connection.rollback()
return RECORD_NOT_EXIST
except Exception as e:
print(f"Error changing user password: {e}")
connection.rollback()
return DATABASE_ERROR
def fetch_devices_by_user_with_data(connection, table_name, user_name, data_names):
"""Fetch all records from the specified table where the user field equals the specified value.
Return a list of dictionaries containing id, dev_id, name, location."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT id, dev_id, name, location FROM {} WHERE user_name = %s ORDER BY id ASC;").format(sql.Identifier(table_name))
# print(user_name)
# Execute the query
cursor.execute(query, [user_name])
# Fetch all records
records = cursor.fetchall()
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Convert records to a list of dictionaries
records_list = []
for record in records:
record_dict = dict(zip(column_names, record))
tele_table_name = 'tele_' + str(record_dict['id'])
latest_time = get_latest_tele_timestamp(connection, tele_table_name)
print(latest_time)
if latest_time:
min_ago = datetime.datetime.now(datetime.timezone.utc) - latest_time
min_ago_minutes = (int)(min_ago.total_seconds()) // 60 # Convert to minutes
record_dict['update'] = min_ago_minutes
values = get_tele_values_by_timestamp(connection, tele_table_name, data_names, latest_time)
print(f"values = {values}")
record_dict.update(values)
else:
record_dict['update'] = 100000 # or some default value indicating no data
print(record_dict)
records_list.append(record_dict)
return records_list
except Exception as e:
print(f"Error fetching records by user from the table: {e}")
return None
def fetch_devices_by_user(connection, table_name, user_name):
"""Fetch all records from the specified table where the user field equals the specified value.
Return a list of dictionaries containing id, dev_id, name, location."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT id, dev_id, name, location FROM {} WHERE user_name = %s ORDER BY id ASC;").format(sql.Identifier(table_name))
# print(user_name)
# Execute the query
cursor.execute(query, [user_name])
# Fetch all records
records = cursor.fetchall()
# Get column names
column_names = [desc[0] for desc in cursor.description]
# Convert records to a list of dictionaries
records_list = []
for record in records:
record_dict = dict(zip(column_names, record))
print(record_dict)
records_list.append(record_dict)
return records_list
except Exception as e:
print(f"Error fetching records by user from the table: {e}")
return None
def fetch_devices_by_id(connection, table_name, id):
"""Fetch all records from the specified table where the user field equals the specified value.
Return a list of dictionaries containing id, dev_id, name, location."""
try:
with connection.cursor() as cursor:
# Construct the SQL query
query = sql.SQL("SELECT dev_id, name, location FROM {} WHERE id = %s;").format(sql.Identifier(table_name))
# print(user_name)
# Execute the query
cursor.execute(query, [id])
# Fetch all records
record = cursor.fetchone()
if record:
return record[0], record[1], record[2]
else:
return None, None, None
except Exception as e:
print(f"Error fetching records by id from the table: {e}")
return None, None, None
def delete_device_by_id(connection, table_name, id):
"""Delete a record from the specified table based on the id."""
try:
with connection.cursor() as cursor:
# Construct the SQL query for deleting the record
delete_query = sql.SQL("""
DELETE FROM {}
WHERE id = %s;
""").format(sql.Identifier(table_name))
# Execute the delete query with the provided id
cursor.execute(delete_query, [id])
# Check if the delete was successful
if cursor.rowcount > 0:
connection.commit()
delete_table(connection, f"tele_{id}")
delete_table(connection, f"attr_{id}")
print(f"Device with id '{id}' deleted successfully.")
return SUCCESS
else:
connection.rollback()
print(f"No device found with id '{id}'.")
return RECORD_NOT_EXIST
except Exception as e:
print(f"Error deleting device: {e}")
connection.rollback()
return DATABASE_ERROR
def update_device_by_id(connection, table_name, id, name, location, dev_id):
"""Update a record in the specified table based on the id."""
try:
with connection.cursor() as cursor:
# Construct the SQL query for updating the record
update_query = sql.SQL("""
UPDATE {}
SET name = %s, location = %s, dev_id = %s
WHERE id = %s;
""").format(sql.Identifier(table_name))
# Execute the update query with the provided values
cursor.execute(update_query, (name, location, dev_id, id))
# Check if the update was successful
if cursor.rowcount > 0:
connection.commit()
print(f"Device with id '{id}' updated successfully.")
return SUCCESS
else:
connection.rollback()
print(f"No device found with id '{id}'.")
return RECORD_NOT_EXIST
except Exception as e:
print(f"Error updating device: {e}")
connection.rollback()
return DATABASE_ERROR
def add_device(connection, table_name, name, location, dev_id, user_name):
"""Insert a new record into the specified table with the given details and return the inserted record ID."""
try:
with connection.cursor() as cursor:
# Construct the SQL query for inserting a new record and returning the id
insert_query = sql.SQL("""
INSERT INTO {} (name, location, dev_id, user_name)
VALUES (%s, %s, %s, %s)
RETURNING id;
""").format(sql.Identifier(table_name))
# Execute the insert query with the provided values
cursor.execute(insert_query, (name, location, dev_id, user_name))
# Fetch the inserted record ID
inserted_id = cursor.fetchone()[0]
connection.commit()
create_tele_table(connection, f"tele_{inserted_id}")
create_attr_table(connection, f"attr_{inserted_id}")
print(f"Device record inserted into table '{table_name}' successfully with ID: {inserted_id}")
return SUCCESS
except Exception as e:
print(f"Error inserting device record into the table: {e}")
connection.rollback()
return DATABASE_ERROR
# if __name__ == "__main__":
# connection = connect_to_db()
# if connection:
# check_and_create_table(connection, "attr")
# dev_id = "abcd"
# record_id = get_id_by_dev_id(connection, "devices", dev_id)
# print(f"The ID for dev_id '{dev_id}' is: {record_id}")
# connection.close()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。