13 Star 22 Fork 12

anolis/soma

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
eval-runner.py 7.51 KB
一键复制 编辑 原始数据 按行查看 历史
luyao 提交于 2024-06-19 16:43 +08:00 . update evaluation
import time
import os
import subprocess
import sys
import traceback
from common import const
from deploy import const as deploy_const
from config import conf
from eval_lib.common.logger import get_logger
from eval_lib.common.logger import LoggerManager
from common.utils import redis_runner
from common.utils import zip_dir
from eval_lib.databases.redis import const as redis_const
from eval_lib.source.dictonary import Dictionary
from common.client import ResultClient, LogClient
from deploy.deploy import DeployWorker
from common.module import DeployMeta
log = get_logger()
class Runner():
def __init__(self):
self.uuid = conf.case_params.uuid
self.case_params = conf.case_params
self.start_time = int(time.time())
self.pytest_process: subprocess.Popen = None
self.runner_dir = const.LOCAL_PATH
self.runner_data_path = f"{conf.runner_data_dir}/runner-{self.uuid}"
self.runner_report_path = f"{self.runner_data_path}/report"
self.runner_log_path = f"{self.runner_data_path}/log"
self.runner_allure_path = f"{self.runner_data_path}/allure-result"
self.log_path = f"{self.runner_log_path}/runner.log"
LoggerManager(log_file=self.log_path)
self.lc = self.start_forward_log(log_path=self.log_path)
self.deploy_worker = None
def run(self):
try:
self.init_env()
self.deploy_deepflow_server()
self.exec_pytest()
self.wait()
except Exception as e:
log.error(f"Runner {self.uuid} run error: {e}")
log.error(traceback.format_exc())
finally:
if self.deploy_worker:
self.deploy_worker.release()
self.lc.stop()
self.push_results()
redis_runner.set_runner_complete(uuid=self.uuid)
time.sleep(300)
def init_env(self):
"""初始化环境目录
"""
# 创建数据目录
folder_paths = [
self.runner_report_path,
self.runner_log_path,
self.runner_allure_path,
]
for folder_path in folder_paths:
os.makedirs(folder_path, exist_ok=True)
log.info(f"Runner {self.uuid} init env success.")
log.info(f"data_dir is : {self.runner_data_path}")
def exec_pytest(self):
# 执行测试用例
envs = os.environ.copy()
envs["PYTHONPATH"] = f":{self.runner_dir}"
relative_case_path = self.get_relative_case_path()
try:
command = f"pytest -vs {relative_case_path} --alluredir {self.runner_allure_path} --workers {self.case_params.process_num}"
# 执行 pytest 命令
log.info(f"exec pytest command: {command}")
with open(self.log_path, "a") as log_file:
self.pytest_process = subprocess.Popen(
command,
shell=True,
cwd=self.runner_dir,
stdout=log_file,
stderr=log_file,
env=envs,
)
except subprocess.CalledProcessError as e:
log.error("exec pytest error:", e)
redis_runner.set_case_running(uuid=self.uuid)
def wait(self):
while True:
# 检查进程状态
time.sleep(5)
# pytest 进程结束了
if self.pytest_process.poll() is not None:
if self.pytest_process.returncode == 0:
log.info("pytest process has finished.")
else:
_, pytest_stderr = self.pytest_process.communicate()
log.error("pytest process occurred error")
if pytest_stderr is not None:
log.error(f"error_log: {pytest_stderr.decode()}")
redis_runner.set_case_complete(uuid=self.uuid)
break
runner_info_dict = redis_runner.get_runner_info(uuid=self.uuid)
if runner_info_dict["case-control-status"
] == redis_const.CASE_STATUS_FORCE_END:
# 主动取消case执行
redis_runner.set_case_end(uuid=self.uuid)
log.error("case force end")
self.interrupt()
break
def interrupt(self):
self.pytest_process.kill()
log.error(f"Runner {self.uuid} interrupt.")
def deploy_deepflow_server(self):
agent_type = conf.case_params.agent_type
if agent_type != 'deepflowce':
return
platform_type = conf.platform_tools.get("type", "")
if platform_type != 'aliyun':
return
meta = DeployMeta()
meta.init(
instance_name=deploy_const.ENV_INSTANCE_NAME_DEFAULT,
type=deploy_const.ENV_TYPE_DEEPFLOW_CE,
uuid=self.case_params.uuid
)
self.deploy_worker = DeployWorker(meta=meta)
if not self.deploy_worker.deploy():
log.error("deploy deepflow server failed")
raise Exception("deploy deepflow server failed")
def get_relative_case_path(self):
case_path = Dictionary().CASE_DICTIONARY.get(
self.case_params.case_name
)
if not case_path:
log.error(f"case_name: {self.case_params.case_name} not support")
raise Exception(
f"case_name: {self.case_params.case_name} not support"
)
relative_case_path = os.path.join("./case/", case_path[0])
return relative_case_path
def start_forward_log(self, log_path):
if not os.path.exists(log_path):
os.mknod(log_path)
log.info("start log forwarding")
server_url = f"http://{const.CONTROLLER_HOST}:{conf.listen_port}{const.API_PREFIX_RESULT_LOG}"
lc = LogClient(
uuid=self.uuid, log_file=log_path, server_url=server_url
)
lc.setDaemon(True)
lc.start()
return lc
def push_results(self):
log.info("start push result to controller")
runner_data_zip = f"runner-{self.uuid}.zip"
zip_dir(folder_path=self.runner_data_path, output_path=runner_data_zip)
server_url = f"http://{const.CONTROLLER_HOST}:{conf.listen_port}{const.API_PREFIX_RESULT_ZIP}"
rc = ResultClient(server_url=server_url)
rc.send_result_zip(zip_file_path=runner_data_zip)
log.info(f"Runner {self.uuid} push results.")
def get_results(self):
# TODO: luyao 收集测试结果
log.info("start gengerate allure result")
allure_tmp_dir = "allure-report"
command = f"allure generate -c {self.runner_allure_path}/ -o {allure_tmp_dir}"
try:
process = subprocess.run(
command,
shell=True,
cwd=self.runner_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if process.returncode == 0:
log.info("allure-report file gengerate successful")
else:
log.error(
f"allure-report file gengerate failed: error: {process.stderr.decode()}"
)
return False
except subprocess.CalledProcessError:
log.error("allure generate error, cmd error")
return False
zip_dir(
folder_path="allure-report",
output_path=f"{self.runner_allure_path}/allure-report.zip"
)
if __name__ == '__main__':
if not conf.is_valid():
print('Invalid conf value, error exit.')
sys.exit(1)
Runner().run()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/anolis/soma.git
git@gitee.com:anolis/soma.git
anolis
soma
soma
ee7678678d4d6003e83b429a31a6fb0f2e8874e1

搜索帮助