代码拉取完成,页面将自动刷新
import sqlite3
import threading
import os
from typing import Dict, List, Any
from datetime import datetime
from contextlib import contextmanager
from functools import wraps
from func.sql_commands import *
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.lock = threading.Lock() # 添加锁
self._init_db()
def _get_conn(self):
"""每个线程获得一个独立的连接"""
return sqlite3.connect(self.db_path, check_same_thread=False)
@contextmanager
def _get_cursor(self):
"""获取数据库游标的上下文管理器"""
with self.lock: # 使用锁保护数据库操作
conn = self._get_conn()
try:
cursor = conn.cursor()
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _init_db(self):
"""初始化数据库表结构"""
# 检查数据库文件所在目录是否存在,如果不存在则创建
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
with self._get_cursor() as cursor:
# 创建产品表
cursor.execute(Product.create_command)
# 创建用户表
cursor.execute(User.create_command)
# 创建验证码信息表
cursor.execute(VerificationCode.create_command)
# 创建商品评论表
cursor.execute(ProductComment.create_command)
# 创建关注列表表
cursor.execute(FollowList.create_command)
# 创建商品图片信息记录表
cursor.execute(ProductImage.create_command)
def add_product(self, product: Dict[str, Any]) -> int:
"""添加新产品"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO products (
name, image_folder, description, price,
seller_id, is_completed, buyer_id,
created_at, updated_at, completed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
product['name'],
product['image_folder'],
product.get('description', ''),
product['price'],
product['seller_id'],
product.get('is_completed', False),
product.get('buyer_id', -1),
current_time,
current_time,
None if not product.get('is_completed', False) else current_time
))
return cursor.lastrowid
def get_product(self, product_id: int) -> Dict[str, Any]:
"""获取单个产品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE id = ?', (product_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
}
return None
def get_all_products(self) -> List[Dict[str, Any]]:
"""获取所有产品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products')
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def update_product(self, product_id: int, product: Dict[str, Any]) -> bool:
"""更新产品信息"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否需要更新completed_at
fields = ['name', 'image_folder', 'description', 'price', 'seller_id', 'is_completed', 'buyer_id']
values = {
key: product[key] for key in fields if key in product
}
set_clause = ', '.join([f"{key} = ?" for key in values])
if product.get('is_completed', False):
cursor.execute(f'''
UPDATE products
SET {set_clause},
updated_at = ?, completed_at = ?
WHERE id = ?
''', (
*values.values(),
current_time,
current_time,
product_id
))
else:
cursor.execute(f'''
UPDATE products
SET {set_clause},
updated_at = ?
WHERE id = ?
''', (
*values.values(),
current_time,
product_id
))
return cursor.rowcount > 0
def delete_product(self, product_id: int) -> bool:
"""删除产品"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))
return cursor.rowcount > 0
def add_user(self, user: Dict[str, Any]) -> int:
"""添加新用户"""
with self._get_cursor() as cursor:
cursor.execute('''
INSERT INTO users (nickname, description, avatar, email, password)
VALUES (?, ?, ?, ?, ?)
''', (
user['nickname'],
user.get('description', ''),
user.get('avatar', ''),
user['email'],
user['password']
))
return cursor.lastrowid
def get_user(self, user_id: int) -> Dict[str, Any]:
"""获取单个用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
def get_user_by_email(self, email: str) -> Dict[str, Any]:
"""通过邮箱获取用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE email = ?', (email,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
def get_all_users(self) -> List[Dict[str, Any]]:
"""获取所有用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users')
return [{
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
} for row in cursor.fetchall()]
def update_user(self, user_id: int, user: Dict[str, Any]) -> bool:
"""更新用户信息"""
with self._get_cursor() as cursor:
cursor.execute('''
UPDATE users
SET nickname = ?, description = ?, avatar = ?, email = ?, password = ?
WHERE id = ?
''', (
user['nickname'],
user.get('description', ''),
user.get('avatar', ''),
user['email'],
user['password'],
user_id
))
return cursor.rowcount > 0
def delete_user(self, user_id: int) -> bool:
"""删除用户"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
return cursor.rowcount > 0
def get_products_by_seller(self, seller_id: int) -> List[Dict[str, Any]]:
"""获取指定卖家的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE seller_id = ?', (seller_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def get_products_by_buyer(self, buyer_id: int) -> List[Dict[str, Any]]:
"""获取指定买家的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE buyer_id = ?', (buyer_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def get_available_products(self) -> List[Dict[str, Any]]:
"""获取所有可交易的商品(未完成的)"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE is_completed = 0')
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def add_comment(self, comment: Dict[str, Any]) -> int:
"""添加商品评论"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加评论
cursor.execute('''
INSERT INTO product_comments (product_id, user_id, parent_comment_id, content, created_at)
VALUES (?, ?, ?, ?, ?)
''', (
comment['product_id'],
comment['user_id'],
comment.get('parent_comment_id', -1),
comment['content'],
current_time
))
# 更新商品的更新时间
cursor.execute('''
UPDATE products
SET updated_at = ?
WHERE id = ?
''', (current_time, comment['product_id']))
return cursor.lastrowid
def get_comment(self, comment_id: int) -> Dict[str, Any]:
"""获取单个评论"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE id = ?', (comment_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
}
return None
def get_product_comments(self, product_id: int) -> List[Dict[str, Any]]:
"""获取商品的所有评论"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE product_id = ? ORDER BY created_at DESC', (product_id,))
return [{
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
} for row in cursor.fetchall()]
def get_comment_replies(self, comment_id: int) -> List[Dict[str, Any]]:
"""获取评论的所有回复"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE parent_comment_id = ? ORDER BY created_at ASC', (comment_id,))
return [{
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
} for row in cursor.fetchall()]
def delete_comment(self, comment_id: int) -> bool:
"""删除评论"""
with self._get_cursor() as cursor:
# 首先删除该评论的所有回复
cursor.execute('DELETE FROM product_comments WHERE parent_comment_id = ?', (comment_id,))
# 然后删除评论本身
cursor.execute('DELETE FROM product_comments WHERE id = ?', (comment_id,))
return cursor.rowcount > 0
def add_favorite(self, user_id: int, product_id: int) -> bool:
"""添加商品到关注列表"""
with self._get_cursor() as cursor:
try:
cursor.execute('''
INSERT INTO favorites (user_id, product_id)
VALUES (?, ?)
''', (user_id, product_id))
return True
except sqlite3.IntegrityError:
# 如果已经关注过,则返回False
return False
def remove_favorite(self, user_id: int, product_id: int) -> bool:
"""从关注列表中移除商品"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM favorites WHERE user_id = ? AND product_id = ?', (user_id, product_id))
return cursor.rowcount > 0
def get_user_favorites(self, user_id: int) -> List[Dict[str, Any]]:
"""获取用户关注的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('''
SELECT p.*, f.last_view_at FROM products p
INNER JOIN favorites f ON p.id = f.product_id
WHERE f.user_id = ?
ORDER BY f.created_at DESC
''', (user_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11],
'last_view_at': row[12]
} for row in cursor.fetchall()]
def is_favorite(self, user_id: int, product_id: int) -> bool:
"""检查用户是否已关注商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT 1 FROM favorites WHERE user_id = ? AND product_id = ?', (user_id, product_id))
return cursor.fetchone() is not None
def add_address(self, address: Dict[str, Any]) -> int:
"""添加交易地址"""
with self._get_cursor() as cursor:
cursor.execute('''
INSERT INTO addresses (user_id, address)
VALUES (?, ?)
''', (
address['user_id'],
address['address']
))
return cursor.lastrowid
def get_address(self, address_id: int) -> Dict[str, Any]:
"""获取单个交易地址"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM addresses WHERE id = ?', (address_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'user_id': row[1],
'address': row[2],
'created_at': row[3]
}
return None
def get_user_addresses(self, user_id: int) -> List[Dict[str, Any]]:
"""获取用户的所有交易地址"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM addresses WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
return [{
'id': row[0],
'user_id': row[1],
'address': row[2],
'created_at': row[3]
} for row in cursor.fetchall()]
def update_address(self, address_id: int, address: Dict[str, Any]) -> bool:
"""更新交易地址"""
with self._get_cursor() as cursor:
cursor.execute('''
UPDATE addresses
SET address = ?
WHERE id = ?
''', (
address['address'],
address_id
))
return cursor.rowcount > 0
def delete_address(self, address_id: int) -> bool:
"""删除交易地址"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM addresses WHERE id = ?', (address_id,))
return cursor.rowcount > 0
def add_product_image(self, image: Dict[str, Any]) -> int:
"""添加商品图片"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加图片
cursor.execute('''
INSERT INTO product_images (product_id, image_path)
VALUES (?, ?)
''', (
image['product_id'],
image['image_path']
))
# 更新商品的更新时间
cursor.execute('''
UPDATE products
SET updated_at = ?
WHERE id = ?
''', (current_time, image['product_id']))
return cursor.lastrowid
def get_product_images(self, product_id: int) -> List[Dict[str, Any]]:
"""获取商品的所有图片"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_images WHERE product_id = ? ORDER BY created_at ASC', (product_id,))
return [{
'id': row[0],
'product_id': row[1],
'image_path': row[2],
'created_at': row[3]
} for row in cursor.fetchall()]
def delete_product_image(self, image_id: int) -> bool:
"""删除商品图片"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM product_images WHERE id = ?', (image_id,))
return cursor.rowcount > 0
def delete_product_images(self, product_id: int) -> bool:
"""删除商品的所有图片"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM product_images WHERE product_id = ?', (product_id,))
return cursor.rowcount > 0
def add_verification_code(self, email: str, code: str, expired_at: str) -> int:
"""添加验证码信息"""
with self._get_cursor() as cursor:
# 删除该邮箱的旧验证码
cursor.execute('DELETE FROM verification_codes WHERE email = ?', (email,))
# 添加新验证码
cursor.execute('''
INSERT INTO verification_codes (email, code, expired_at)
VALUES (?, ?, ?)
''', (email, code, expired_at))
return cursor.lastrowid
def get_verification_code(self, email: str) -> Dict[str, Any]:
"""获取验证码信息"""
with self._get_cursor() as cursor:
cursor.execute('''
SELECT * FROM verification_codes
WHERE email = ?
ORDER BY created_at DESC
LIMIT 1
''', (email,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'email': row[1],
'code': row[2],
'created_at': row[3],
'expired_at': row[4]
}
return None
def delete_verification_code(self, email: str) -> bool:
"""删除验证码信息"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM verification_codes WHERE email = ?', (email,))
return cursor.rowcount > 0
def clean_expired_codes(self) -> int:
"""清理过期的验证码"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('DELETE FROM verification_codes WHERE expired_at < ?', (current_time,))
return cursor.rowcount
def update_product_seller_view_time(self, product_id: int) -> bool:
"""更新商品卖家查看时间"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
UPDATE products
SET seller_last_view_at = ?
WHERE id = ?
''', (current_time, product_id))
return cursor.rowcount > 0
def update_favorite_view_time(self, user_id: int, product_id: int) -> bool:
"""更新用户收藏查看时间"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
UPDATE favorites
SET last_view_at = ?
WHERE user_id = ? AND product_id = ?
''', (current_time, user_id, product_id))
return cursor.rowcount > 0
def get_user_by_id(self, user_id: int) -> Dict[str, Any]:
"""通过用户ID获取用户信息"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。