Ai
4 Star 0 Fork 0

纸豪/Yanyuan SwapHub

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
dao.py 23.69 KB
一键复制 编辑 原始数据 按行查看 历史
zhihao 提交于 2025-05-28 17:05 +08:00 . 修改dao,改为多线程
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
import sqlite3
import threading
import os
from typing import Dict, List, Any
from datetime import datetime
from contextlib import contextmanager
from functools import wraps
from func.sql_commands import *
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.lock = threading.Lock() # 添加锁
self._init_db()
def _get_conn(self):
"""每个线程获得一个独立的连接"""
return sqlite3.connect(self.db_path, check_same_thread=False)
@contextmanager
def _get_cursor(self):
"""获取数据库游标的上下文管理器"""
with self.lock: # 使用锁保护数据库操作
conn = self._get_conn()
try:
cursor = conn.cursor()
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _init_db(self):
"""初始化数据库表结构"""
# 检查数据库文件所在目录是否存在,如果不存在则创建
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
with self._get_cursor() as cursor:
# 创建产品表
cursor.execute(Product.create_command)
# 创建用户表
cursor.execute(User.create_command)
# 创建验证码信息表
cursor.execute(VerificationCode.create_command)
# 创建商品评论表
cursor.execute(ProductComment.create_command)
# 创建关注列表表
cursor.execute(FollowList.create_command)
# 创建商品图片信息记录表
cursor.execute(ProductImage.create_command)
def add_product(self, product: Dict[str, Any]) -> int:
"""添加新产品"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO products (
name, image_folder, description, price,
seller_id, is_completed, buyer_id,
created_at, updated_at, completed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
product['name'],
product['image_folder'],
product.get('description', ''),
product['price'],
product['seller_id'],
product.get('is_completed', False),
product.get('buyer_id', -1),
current_time,
current_time,
None if not product.get('is_completed', False) else current_time
))
return cursor.lastrowid
def get_product(self, product_id: int) -> Dict[str, Any]:
"""获取单个产品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE id = ?', (product_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
}
return None
def get_all_products(self) -> List[Dict[str, Any]]:
"""获取所有产品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products')
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def update_product(self, product_id: int, product: Dict[str, Any]) -> bool:
"""更新产品信息"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否需要更新completed_at
fields = ['name', 'image_folder', 'description', 'price', 'seller_id', 'is_completed', 'buyer_id']
values = {
key: product[key] for key in fields if key in product
}
set_clause = ', '.join([f"{key} = ?" for key in values])
if product.get('is_completed', False):
cursor.execute(f'''
UPDATE products
SET {set_clause},
updated_at = ?, completed_at = ?
WHERE id = ?
''', (
*values.values(),
current_time,
current_time,
product_id
))
else:
cursor.execute(f'''
UPDATE products
SET {set_clause},
updated_at = ?
WHERE id = ?
''', (
*values.values(),
current_time,
product_id
))
return cursor.rowcount > 0
def delete_product(self, product_id: int) -> bool:
"""删除产品"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))
return cursor.rowcount > 0
def add_user(self, user: Dict[str, Any]) -> int:
"""添加新用户"""
with self._get_cursor() as cursor:
cursor.execute('''
INSERT INTO users (nickname, description, avatar, email, password)
VALUES (?, ?, ?, ?, ?)
''', (
user['nickname'],
user.get('description', ''),
user.get('avatar', ''),
user['email'],
user['password']
))
return cursor.lastrowid
def get_user(self, user_id: int) -> Dict[str, Any]:
"""获取单个用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
def get_user_by_email(self, email: str) -> Dict[str, Any]:
"""通过邮箱获取用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE email = ?', (email,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
def get_all_users(self) -> List[Dict[str, Any]]:
"""获取所有用户"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users')
return [{
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
} for row in cursor.fetchall()]
def update_user(self, user_id: int, user: Dict[str, Any]) -> bool:
"""更新用户信息"""
with self._get_cursor() as cursor:
cursor.execute('''
UPDATE users
SET nickname = ?, description = ?, avatar = ?, email = ?, password = ?
WHERE id = ?
''', (
user['nickname'],
user.get('description', ''),
user.get('avatar', ''),
user['email'],
user['password'],
user_id
))
return cursor.rowcount > 0
def delete_user(self, user_id: int) -> bool:
"""删除用户"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
return cursor.rowcount > 0
def get_products_by_seller(self, seller_id: int) -> List[Dict[str, Any]]:
"""获取指定卖家的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE seller_id = ?', (seller_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def get_products_by_buyer(self, buyer_id: int) -> List[Dict[str, Any]]:
"""获取指定买家的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE buyer_id = ?', (buyer_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def get_available_products(self) -> List[Dict[str, Any]]:
"""获取所有可交易的商品(未完成的)"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM products WHERE is_completed = 0')
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11]
} for row in cursor.fetchall()]
def add_comment(self, comment: Dict[str, Any]) -> int:
"""添加商品评论"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加评论
cursor.execute('''
INSERT INTO product_comments (product_id, user_id, parent_comment_id, content, created_at)
VALUES (?, ?, ?, ?, ?)
''', (
comment['product_id'],
comment['user_id'],
comment.get('parent_comment_id', -1),
comment['content'],
current_time
))
# 更新商品的更新时间
cursor.execute('''
UPDATE products
SET updated_at = ?
WHERE id = ?
''', (current_time, comment['product_id']))
return cursor.lastrowid
def get_comment(self, comment_id: int) -> Dict[str, Any]:
"""获取单个评论"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE id = ?', (comment_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
}
return None
def get_product_comments(self, product_id: int) -> List[Dict[str, Any]]:
"""获取商品的所有评论"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE product_id = ? ORDER BY created_at DESC', (product_id,))
return [{
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
} for row in cursor.fetchall()]
def get_comment_replies(self, comment_id: int) -> List[Dict[str, Any]]:
"""获取评论的所有回复"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_comments WHERE parent_comment_id = ? ORDER BY created_at ASC', (comment_id,))
return [{
'id': row[0],
'product_id': row[1],
'user_id': row[2],
'parent_comment_id': row[3] if row[3] != -1 else None,
'content': row[4],
'created_at': row[5]
} for row in cursor.fetchall()]
def delete_comment(self, comment_id: int) -> bool:
"""删除评论"""
with self._get_cursor() as cursor:
# 首先删除该评论的所有回复
cursor.execute('DELETE FROM product_comments WHERE parent_comment_id = ?', (comment_id,))
# 然后删除评论本身
cursor.execute('DELETE FROM product_comments WHERE id = ?', (comment_id,))
return cursor.rowcount > 0
def add_favorite(self, user_id: int, product_id: int) -> bool:
"""添加商品到关注列表"""
with self._get_cursor() as cursor:
try:
cursor.execute('''
INSERT INTO favorites (user_id, product_id)
VALUES (?, ?)
''', (user_id, product_id))
return True
except sqlite3.IntegrityError:
# 如果已经关注过,则返回False
return False
def remove_favorite(self, user_id: int, product_id: int) -> bool:
"""从关注列表中移除商品"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM favorites WHERE user_id = ? AND product_id = ?', (user_id, product_id))
return cursor.rowcount > 0
def get_user_favorites(self, user_id: int) -> List[Dict[str, Any]]:
"""获取用户关注的所有商品"""
with self._get_cursor() as cursor:
cursor.execute('''
SELECT p.*, f.last_view_at FROM products p
INNER JOIN favorites f ON p.id = f.product_id
WHERE f.user_id = ?
ORDER BY f.created_at DESC
''', (user_id,))
return [{
'id': row[0],
'name': row[1],
'image_folder': row[2],
'description': row[3],
'price': row[4],
'seller_id': row[5],
'is_completed': bool(row[6]),
'buyer_id': row[7] if row[7] != -1 else None,
'created_at': row[8],
'updated_at': row[9],
'completed_at': row[10],
'seller_last_view_at': row[11],
'last_view_at': row[12]
} for row in cursor.fetchall()]
def is_favorite(self, user_id: int, product_id: int) -> bool:
"""检查用户是否已关注商品"""
with self._get_cursor() as cursor:
cursor.execute('SELECT 1 FROM favorites WHERE user_id = ? AND product_id = ?', (user_id, product_id))
return cursor.fetchone() is not None
def add_address(self, address: Dict[str, Any]) -> int:
"""添加交易地址"""
with self._get_cursor() as cursor:
cursor.execute('''
INSERT INTO addresses (user_id, address)
VALUES (?, ?)
''', (
address['user_id'],
address['address']
))
return cursor.lastrowid
def get_address(self, address_id: int) -> Dict[str, Any]:
"""获取单个交易地址"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM addresses WHERE id = ?', (address_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'user_id': row[1],
'address': row[2],
'created_at': row[3]
}
return None
def get_user_addresses(self, user_id: int) -> List[Dict[str, Any]]:
"""获取用户的所有交易地址"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM addresses WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
return [{
'id': row[0],
'user_id': row[1],
'address': row[2],
'created_at': row[3]
} for row in cursor.fetchall()]
def update_address(self, address_id: int, address: Dict[str, Any]) -> bool:
"""更新交易地址"""
with self._get_cursor() as cursor:
cursor.execute('''
UPDATE addresses
SET address = ?
WHERE id = ?
''', (
address['address'],
address_id
))
return cursor.rowcount > 0
def delete_address(self, address_id: int) -> bool:
"""删除交易地址"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM addresses WHERE id = ?', (address_id,))
return cursor.rowcount > 0
def add_product_image(self, image: Dict[str, Any]) -> int:
"""添加商品图片"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加图片
cursor.execute('''
INSERT INTO product_images (product_id, image_path)
VALUES (?, ?)
''', (
image['product_id'],
image['image_path']
))
# 更新商品的更新时间
cursor.execute('''
UPDATE products
SET updated_at = ?
WHERE id = ?
''', (current_time, image['product_id']))
return cursor.lastrowid
def get_product_images(self, product_id: int) -> List[Dict[str, Any]]:
"""获取商品的所有图片"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM product_images WHERE product_id = ? ORDER BY created_at ASC', (product_id,))
return [{
'id': row[0],
'product_id': row[1],
'image_path': row[2],
'created_at': row[3]
} for row in cursor.fetchall()]
def delete_product_image(self, image_id: int) -> bool:
"""删除商品图片"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM product_images WHERE id = ?', (image_id,))
return cursor.rowcount > 0
def delete_product_images(self, product_id: int) -> bool:
"""删除商品的所有图片"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM product_images WHERE product_id = ?', (product_id,))
return cursor.rowcount > 0
def add_verification_code(self, email: str, code: str, expired_at: str) -> int:
"""添加验证码信息"""
with self._get_cursor() as cursor:
# 删除该邮箱的旧验证码
cursor.execute('DELETE FROM verification_codes WHERE email = ?', (email,))
# 添加新验证码
cursor.execute('''
INSERT INTO verification_codes (email, code, expired_at)
VALUES (?, ?, ?)
''', (email, code, expired_at))
return cursor.lastrowid
def get_verification_code(self, email: str) -> Dict[str, Any]:
"""获取验证码信息"""
with self._get_cursor() as cursor:
cursor.execute('''
SELECT * FROM verification_codes
WHERE email = ?
ORDER BY created_at DESC
LIMIT 1
''', (email,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'email': row[1],
'code': row[2],
'created_at': row[3],
'expired_at': row[4]
}
return None
def delete_verification_code(self, email: str) -> bool:
"""删除验证码信息"""
with self._get_cursor() as cursor:
cursor.execute('DELETE FROM verification_codes WHERE email = ?', (email,))
return cursor.rowcount > 0
def clean_expired_codes(self) -> int:
"""清理过期的验证码"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('DELETE FROM verification_codes WHERE expired_at < ?', (current_time,))
return cursor.rowcount
def update_product_seller_view_time(self, product_id: int) -> bool:
"""更新商品卖家查看时间"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
UPDATE products
SET seller_last_view_at = ?
WHERE id = ?
''', (current_time, product_id))
return cursor.rowcount > 0
def update_favorite_view_time(self, user_id: int, product_id: int) -> bool:
"""更新用户收藏查看时间"""
with self._get_cursor() as cursor:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
UPDATE favorites
SET last_view_at = ?
WHERE user_id = ? AND product_id = ?
''', (current_time, user_id, product_id))
return cursor.rowcount > 0
def get_user_by_id(self, user_id: int) -> Dict[str, Any]:
"""通过用户ID获取用户信息"""
with self._get_cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'nickname': row[1],
'description': row[2],
'avatar': row[3],
'email': row[4],
'password': row[5],
'address': row[6],
'created_at': row[7]
}
return None
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhihao2023/yanyuan-swap-hub.git
git@gitee.com:zhihao2023/yanyuan-swap-hub.git
zhihao2023
yanyuan-swap-hub
Yanyuan SwapHub
master

搜索帮助