代码拉取完成,页面将自动刷新
Flask框架更像是为我们开发者提供了一个Web开发引擎,所需的其它开发工具需要引入相应的插件,当然这也符合Flask轻量的特点——“按需插拔”。
所以开发了该脚手架的定义一套能够拿来即用的Flask工程,无需自己从零组装,可基于FlaskBoot项目模板按需定制化自己的项目。
业务逻辑写在service层
class PublicService(BaseService):
@staticmethod
def get_sys_time():
current_time = datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
return Status.OK, {"data": formatted_time}
引入插件的操作实体类需在工厂函数create_app中实例化,防止循环引用
from flask_sqlalchemy import SQLAlchemy
# 防止循环依赖
db = SQLAlchemy()
model文件的每个实体类下建议封装一些执行原生SQL的方法,便于在service层调用,使SQL与逻辑分离便于维护
controller层的返回值可以使用如下格式:
class UploadController:
@staticmethod
@upload.route('/upload', methods=['POST'])
def upload_png():
status, message, info = UploadService.png_service()
return api_response(status=status, message=message, info=info)
路由注册
在app.py中
app.register_blueprint(user_bp)
redis:
HOSTNAME: "YOU HOSTNAME"
PORT: "YOUR PORT"
PASSWORD: "PASSWORD"
@siwa.doc(query=TaskVo)
def start_task():
# 获取请求数据
request_data = request.get_json()
# 交给数据校验模型
task_data = TaskVo(**request_data)
status, message, info = TaskService.start_task(task_data)
return api_response(status=status, message=message, info=info)
目前采用单token的方式,token过期,则会退出登录,用户需要重新登陆
class JwtUtils:
# Your key, used to sign and verify JWT, should be kept secret
__SECRET_KEY = SECRET_KEY
__time = TIME
__redis_switch = REDIS_SWITCH
@classmethod
def create_new_token(cls, username, password):
"""
Create a new JWT and generate jwt (token)
:param username:
:param password:
:return:
"""
# Set the validity period of JWT
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=cls.__time)
payload = {
'username': username,
'password': password,
'exp': expiry_time
}
# Generate JWT
return jwt.encode(payload, cls.__SECRET_KEY, algorithm='HS256')
CREATE TABLE Users (
user_id INT AUTO_INCREMENT PRIMARY KEY,
user_name VARCHAR(255) NOT NULL UNIQUE,
pass_word VARCHAR(255) NOT NULL,
email VARCHAR(255),
role_id INT,
is_deleted TINYINT(1) DEFAULT 0
);
CREATE TABLE roles (
role_id INT AUTO_INCREMENT PRIMARY KEY,
role_name VARCHAR(255) NOT NULL,
access_route VARCHAR(1024),
action VARCHAR(255),
is_deleted TINYINT(1) DEFAULT 0
);
// 初始化数据
INSERT INTO users (user_name, pass_word, role_id, is_deleted)
VALUES ('admin', 'admin123', 1, 0);
INSERT INTO roles (role_id, role_name, is_deleted)
VALUES (1, 'admin', 0);
thread_pool:
max_workers: 3
wait_for_workers: 4
timeout: 60
thread_name_prefix: 'MyThreadPool'
通过线程池的submit方法,提交任务
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class BaseModel(db.Model):
__abstract__ = True
@classmethod
def insert(cls, data):
"""
插入数据库实例
:return:
"""
db.session.add(data)
try:
db.session.commit()
return True
except Exception:
db.session.rollback()
raise
class BaseModel(db.Model):
__abstract__ = True
@classmethod
def paginate(cls, page, per_page, **kwargs):
"""
分页查询
:param page:
:param per_page:
:param kwargs:
:return:
"""
query = cls.query.filter_by(**kwargs)
return query.paginate(page=page, per_page=per_page, error_out=False)
def wrapper(*args, **kwargs):
with db.session.begin(nested=True): # 开启事务,嵌套事务,with自动关闭事务,不需要手动调用db.session.close()
try:
res = func(*args, **kwargs)
db.session.commit() # 提交事务,统一在接口级别提交,不需要在每个service中单独提交,也不需要在model层提交
except exc.SQLAlchemyError as e:
logger.error(e)
db.session.rollback()
return Status.REPEAT_MESSAGE
except Exception as e:
logger.error(e)
db.session.rollback()
return Status.GENERIC_ERROR # 返回通用的错误信息
return res
+ 另一种是在service层手动开启三段式事物,或者在model层手动开启三段式事物
```python
class BaseModel(db.Model):
__abstract__ = True
@classmethod
def insert(cls, data):
"""
插入数据库实例
:return:
"""
db.session.add(data)
try:
db.session.commit()
return True
except Exception:
db.session.rollback()
raise
```
def middleware(app):
@app.before_request
def before_request():
logger.info('在每个请求被路由之前调用')
@app.before_request
def after_request():
logger.info('在每个请求被路由之后调用')
@app.teardown_request
def teardown_request(exception):
logger.info('在每次请求处理后调用')
@app.teardown_appcontext
def shutdown_session(exception=None):
logger.info('在每个请求上下文被销毁时调用')
@staticmethod
@env_config.route('/list', methods=['GET'])
@cache.cached(key_prefix='env_list', timeout=20)
def env_list():
"""
环境列表 分页查询
:return:
"""
status, info = EnvConfigService.env_list_service()
return api_response(status=status, info=info)
def env_list_service():
"""
环境列表 分页查询
:return:
"""
try:
page = request.args.get('page')
size = request.args.get('size')
logger.info(f'page:{type(page)},size:{type(size)}')
if size:
size = int(size)
else:
size = 10
data = Env.paging_query(page, size)
result = []
for env in data:
item = {
'id': env.id,
'env_name': env.env_name,
'url': env.url,
'create_time': env.format_create_time()
}
result.append(item)
return Status.OK, {"data": result, "total": data.total}
except Exception as e:
logger.error(e)
return Status.ERROR_SELECT, '查询失败', ''
class Status(Enum):
OK = {'code': 2000, 'message': 'OK'}
OK_MESSAGE = {'code': 2001}
BAD_REQUEST = {'code': 4000, 'message': 'Bad Request'}
NOT_FOUND = {'code': 4004, 'message': 'Not Found'}
BAD_MESSAGE = {'code': 4001}
REPEAT_MESSAGE = {'code': 1000, 'message': 'Repeat Message'}
ERROR_SELECT = {'code': 1000, 'message': 'Internal Server Error'}
GENERIC_ERROR = {'code': 1001, 'message': 'Service is busy'}
ERROR_INSERT = {'code': 1002, 'message': 'Insert Error'}
business_exception(app)
application_exception(app)
def application_exception(app):
"""
系统异常, 用于处理应用程序中的异常,使用flask的errorhandler装饰器,可以捕获指定的异常
:param app:
:return:
"""
@app.errorhandler(500)
def handle_500_error(error):
api_response(
status=Status.BAD_MESSAGE,
message='服务器内部错误',
info=error
)
def business_exception(app):
"""
业务异常, 用于处理业务逻辑中的异常,使用flask的errorhandler装饰器,可以捕获指定的异常
当出现 ValidationError 异常时,会自动调用 handle_base_error 函数,并将异常实例作为参数传递给该函数,
因此我们可以直接使用异常实例 error 调用其内部的 to_response 方法,返回封装好的 API 响应对象给客户端。
:param app:
:return:
"""
@app.errorhandler(ValidationError)
def handle_base_error(error):
return error.to_response()
@staticmethod
@BaseService.transactional
def export_test_case_service():
"""
导出测试用例
:return:
"""
content = [['id', 'case_name', 'path', 'model', 'data', 'create_time', 'is_smoke', 'assertion']]
case = TestCase.query.order_by(TestCase.created_time.desc())
if case:
for item in case:
content.append(
[item.id, item.case_name, item.path, item.model_name, item.data, item.created_time, item.is_smoke,
item.assertion])
res = excel.make_response_from_array(content, "xlsx", file_name="test_case", sheet_name="test_case")
logger.info(f'res:{res}')
with open('/Users/jeff/pyproject/api-test/data/case.xlsx', 'wb') as f:
f.write(res.data)
return Status.OK
class TaskVo(BaseModel):
"""
定时任务数据校验模型
"""
cron_time: str
method_name: str
module_name: str
@validator('cron_time')
def cron_time_length(cls, v):
if len(v) < 1 or len(v) > 20:
raise ValidationError('cron_time长度必须在1到20之间')
return v
@staticmethod
@tasks.route('/start', methods=['POST'])
def start_task():
# 获取请求数据
request_data = request.get_json()
# 交给数据校验模型
task_data = TaskVo(**request_data)
status, message, info = TaskService.start_task(task_data)
return api_response(status=status, message=message, info=info)
class ThreadPool(object, metaclass=Singleton):
def __init__(self, max_workers=2, wait_for_workers=None, thread_name_prefix='ThreadPoolExecutor', timeout=None):
self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix)
self._wait_for_workers = wait_for_workers
self._timeout = timeout
def submit(self, task, *args, **kwargs):
task = partial(task, *args, **kwargs)
if self._wait_for_workers is not None:
future = self._executor.submit(task)
if self._wait_for_workers > 0 and self._executor._work_queue.qsize() >= self._wait_for_workers:
self._executor._work_queue.get(timeout=self._timeout)
return future
else:
return self._executor.submit(task)
def shutdown(self, wait=True):
self._executor.shutdown(wait=wait)
def __del__(self):
self._executor.shutdown(wait=False)
future1 = thread_pool.submit(worker, 'Task A', 3)
future2 = thread_pool.submit(worker, 'Task B', 4)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。