diff --git a/test/BUILD.gn b/test/BUILD.gn
index bfcd83779d235094d238050559338a59d91135cb..6588f5dfa7771c375901f70ed4cd55dcd0c7af99 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -400,6 +400,7 @@ ohos_unittest("hdc_base_unittest") {
"${hdc_path}/src/common/hdc_huks.cpp",
"unittest/common/base_ut.cpp",
"unittest/common/file_ut.cpp",
+ "unittest/common/forward_ut.cpp",
"unittest/common/heartbeat_ut.cpp",
"unittest/common/hdc_huks_ut.cpp",
"unittest/common/password_ut.cpp",
diff --git a/test/scripts/testModule/test_hdc_stability.py b/test/scripts/testModule/test_hdc_stability.py
index ed0eee786c5ef131b9d09810348c656152dccc49..2e7a8709edccf69e098044a29543578cb11d01cb 100644
--- a/test/scripts/testModule/test_hdc_stability.py
+++ b/test/scripts/testModule/test_hdc_stability.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
-from stability_utils import run_stability_test
+from test_hdc_stability_utils import run_stability_test
class TestHdcStability:
diff --git a/test/scripts/testModule/stability_utils.py b/test/scripts/testModule/test_hdc_stability_utils.py
similarity index 97%
rename from test/scripts/testModule/stability_utils.py
rename to test/scripts/testModule/test_hdc_stability_utils.py
index 912ef7a899fedd7ac5d23a2fcc2fc45434e86114..e37b077d169695c76619e3ecc25dca503e4ce872 100644
--- a/test/scripts/testModule/stability_utils.py
+++ b/test/scripts/testModule/test_hdc_stability_utils.py
@@ -1,1291 +1,1291 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-# Copyright (C) 2025 Huawei Device Co., Ltd.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-运行环境: python 3.10+
-测试方法:
-方法1、使用pytest框架执行
-请见测试用例根目录test\scripts下readme.md中的执行稳定性用例小节
-方法2、直接执行
-进入test\scripts目录,执行python .\testModule\stability_utils.py
-
-测试范围
-hdc shell
-hdc file send/recv
-hdc fport 文件传输
-多session场景测试(hdc tmode + hdc tconn)
-
-注意:
-1、当前仅支持一台设备的压力测试
-2、如果异常退出,请执行 hdc -t [usb_connect_key] tmode port close,关闭网络接口通道
-执行 hdc list targets查看是否还有网络连接,如果有请执行hdc tconn 127.0.0.1:port -remove断开
-执行hdc fport ls查看是否还有端口转发的规则,如果有请执行hdc fport rm tcp:xxxxx tcp:9999删除转发
-"""
-
-import subprocess
-import os
-import logging
-import logging.config
-import time
-from datetime import datetime
-from enum import Enum
-import threading
-import uuid
-import hashlib
-import queue
-from utils import server_loop
-from utils import client_get_file
-from utils import get_local_md5
-
-STABILITY_TEST_VERSION = "v1.0.0"
-
-TIME_REPORT_FORMAT = '%Y-%m-%d %H:%M:%S'
-TIME_FILE_FORMAT = '%Y%m%d_%H%M%S'
-TEST_RESULT = False
-
-WORK_DIR = os.getcwd()
-
-# 消息队列中使用到的key名称
-# 报告公共信息key名称
-REPORT_PUBLIC_INFO_NAME = "public_info"
-# 报告公共信息中的错误信息key名称
-TASK_ERROR_KEY_NAME = "task_error"
-
-# 资源文件相对路径
-RESOURCE_RELATIVE_PATH = "resource"
-# 报告文件相对路径
-REPORTS_RELATIVE_PATH = "reports"
-# 缓存查询到的设备sn
-DEVICE_LIST = []
-# 记录当前启动的session类信息
-SESSION_LIST = []
-# 释放资源命令列表
-RELEASE_TCONN_CMD_LIST = []
-RELEASE_FPORT_CMD_LIST = []
-
-RESOURCE_PATH = os.path.join(WORK_DIR, RESOURCE_RELATIVE_PATH)
-
-EXIT_EVENT = threading.Event()
-
-TEST_THREAD_MSG_QUEUE = queue.Queue()
-
-
-def get_time_str(fmt=TIME_REPORT_FORMAT, need_ms=False):
- now_time = datetime.now()
- if need_ms is False:
- return now_time.strftime(fmt)
- else:
- return f"{now_time.strftime(fmt)}.{now_time.microsecond // 1000:03d}"
-
-
-TEST_FILE_TIME_STR = get_time_str(TIME_FILE_FORMAT)
-LOG_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_{TEST_FILE_TIME_STR}.log")
-REPORT_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_report_{TEST_FILE_TIME_STR}.html")
-logger = None
-
-# 配置日志记录
-logging_config_info = {
- 'version': 1,
- 'formatters': {
- 'format1': {
- 'format': '[%(asctime)s %(name)s %(funcName)s %(lineno)d %(levelname)s][%(process)d][%(thread)d]'
- '[%(threadName)s]%(message)s'
- },
- },
- 'handlers': {
- 'console_handle': {
- 'level': 'DEBUG',
- 'formatter': 'format1',
- 'class': 'logging.StreamHandler',
- 'stream': 'ext://sys.stdout',
- },
- 'file_handle': {
- 'level': 'DEBUG',
- 'formatter': 'format1',
- 'class': 'logging.FileHandler',
- 'filename': LOG_FILE_NAME,
- },
- },
- 'loggers': {
- '': {
- 'handlers': ['console_handle', 'file_handle'],
- 'level': 'DEBUG',
- },
- },
-}
-
-
-class TestType(Enum):
- SHELL = 1
- FILE_SEND = 2
- FILE_RECV = 3
- FPORT_TRANS_FILE = 4
-
-
-# 当前测试任务并行运行的数量
-TASK_COUNT_DEFAULT = 5
-# 循环测试次数
-LOOP_COUNT_DEFAULT = 20
-# 每个循环结束的等待时间,单位秒
-LOOP_DELAY_S_DEFAULT = 0.01
-
-# 进行多session测试时,启动多个端口转发到设备侧的tmode监听的端口,电脑端开启的端口列表
-# 参考端口配置:12345, 12346, 12347, 12348, 12349,需要几个session,可以复制几个放入下面的MUTIL_SESSION_TEST_PC_PORT_LIST中
-MUTIL_SESSION_TEST_PC_PORT_LIST = []
-# 多session测试, 通过tmode命令,设备端切换tcp模式,监听的端口号
-DEVICE_LISTEN_PORT = 9999
-
-# usb session 测试项配置
-"""
-配置项包括如下:
- {
- "test_type": TestType.SHELL,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "shell ls",
- "expected_results": "data",
- },
- {
- "test_type": TestType.FILE_SEND,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file send",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
- {
- "test_type": TestType.FILE_RECV,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file recv",
- "original_file": "medium",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
- {
- "test_type": TestType.FPORT_TRANS_FILE,
- "port_info": [
- {
- "client_connect_port": 22345,
- "daemon_transmit_port": 11081,
- "server_listen_port": 18000,
- },
- {
- "client_connect_port": 22346,
- "daemon_transmit_port": 11082,
- "server_listen_port": 18001,
- },
- ],
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "original_file": "medium",
- },
-"""
-USB_SESSION_TEST_CONFIG = [
- {
- "test_type": TestType.SHELL,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "shell ls",
- "expected_results": "data",
- },
- {
- "test_type": TestType.FILE_SEND,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file send",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
- {
- "test_type": TestType.FILE_RECV,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file recv",
- "original_file": "medium",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
-]
-
-# tcp session 测试项配置
-"""
-配置项包括如下:
- {
- "test_type": TestType.SHELL,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "shell ls",
- "expected_results": "data",
- },
- {
- "test_type": TestType.FILE_SEND,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file send",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
- {
- "test_type": TestType.FILE_RECV,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file recv",
- "original_file": "medium",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
-"""
-TCP_SESSION_TEST_CONFIG = [
- {
- "test_type": TestType.SHELL,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "shell ls",
- "expected_results": "data",
- },
- {
- "test_type": TestType.FILE_SEND,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file send",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
- {
- "test_type": TestType.FILE_RECV,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": LOOP_DELAY_S_DEFAULT,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file recv",
- "original_file": "medium",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- },
-]
-
-HTML_HEAD = """
-
-
-HDC稳定性测试报告
-
-
-"""
-
-
-def init_logger():
- logging.config.dictConfig(logging_config_info)
-
-
-def put_msg(queue_obj, info):
- """
- 给队列中填入测试报告信息,用于汇总报告和生成结果
- """
- queue_obj.put(info)
-
-
-def run_cmd_block(command, timeout=600):
- logger.info(f"cmd: {command}")
- # 启动子进程
- process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
-
- # 用于存储子进程的输出
- output = ""
- error = ""
-
- try:
- # 读取子进程的输出
- output, error = process.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- logger.info(f"run cmd:{command} timeout")
- process.terminate()
- process.kill()
- output, error = process.communicate(timeout=timeout)
- return output, error
-
-
-def run_cmd_and_check_output(command, want_str, timeout=600):
- """
- 阻塞的方式运行命令,然后判断标准输出的返回值中是否有预期的字符串
- 存在预期的字符串返回 True
- 不存在预期的字符串返回 False
- """
- output, error = run_cmd_block(command, timeout)
- if want_str in output:
- return True, (output, error)
- else:
- logger.error(f"can not get expect str:{want_str} cmd:{command} output:{output} error:{error}")
- return False, (output, error)
-
-
-def process_shell_test(connect_key, config, thread_name):
- """
- config格式
- {
- "test_type": TestType.SHELL,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": 0.01,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "shell ls",
- "expected_results": "data",
- }
- """
- logger.info(f"start process_shell_test thread {thread_name}")
- test_count = config["loop_count"]
- delay_s = config["loop_delay_s"]
- cmd = f"hdc -t {connect_key} {config['cmd']}"
- expected_results = config["expected_results"]
- report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
- start_time = time.perf_counter()
- for count in range(test_count):
- logger.info(f"{thread_name} {count}")
- is_ok, info = run_cmd_and_check_output(cmd, expected_results)
- if is_ok:
- logger.info(f"{thread_name} {count} success result:{info}")
- else:
- logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
- f"result:{info}")
- error_time = get_time_str(need_ms=True)
- msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
- f" can not get expect result:{expected_results}, result:{info}"
- report["error_msg"] = f"[{error_time}] result:{info}"
- TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
- EXIT_EVENT.set()
- logger.info(f"{thread_name} {count} set exit event")
- report["finished_test_count"] = count
- break
- if EXIT_EVENT.is_set():
- logger.info(f"{thread_name} {count} exit event is set")
- report["error_msg"] = "event is set, normal exit"
- report["finished_test_count"] = count + 1
- break
- time.sleep(delay_s)
- if "finished_test_count" not in report:
- report["finished_test_count"] = test_count
- if report["finished_test_count"] < test_count:
- report["passed"] = False
- else:
- report["passed"] = True
- elapsed_time = time.perf_counter() - start_time
- report["cost_time"] = elapsed_time
- TEST_THREAD_MSG_QUEUE.put({thread_name: report})
- logger.info(f"stop process_shell_test thread {thread_name}")
-
-
-def process_file_send_test(connect_key, config, thread_name):
- """
- config格式
- {
- "test_type": TestType.FILE_SEND,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": 0.01,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file send",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- }
- """
- logger.info(f"start process_file_send_test thread {thread_name}")
- test_count = config["loop_count"]
- delay_s = config["loop_delay_s"]
- expected_results = config["expected_results"]
- local_file = os.path.join(RESOURCE_PATH, config['local'])
- remote_file = f"{config['remote']}_{uuid.uuid4()}"
- cmd = f"hdc -t {connect_key} {config['cmd']} {local_file} {remote_file}"
- report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
- start_time = time.perf_counter()
- for count in range(test_count):
- logger.info(f"{thread_name} {count}")
- is_ok, info = run_cmd_and_check_output(cmd, expected_results)
- if is_ok:
- logger.info(f"{thread_name} {count} success result:{info}")
- else:
- logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
- f"result:{info}")
- error_time = get_time_str(need_ms=True)
- msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
- f" can not get expect result:{expected_results}, result:{info}"
- report["error_msg"] = f"[{error_time}] result:{info}"
- TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
- EXIT_EVENT.set()
- logger.info(f"{thread_name} {count} set exit event")
- report["finished_test_count"] = count
- break
-
- run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}")
- if EXIT_EVENT.is_set():
- logger.info(f"{thread_name} {count} exit event is set")
- report["error_msg"] = "event is set, normal exit"
- report["finished_test_count"] = count + 1
- break
- time.sleep(delay_s)
- if "finished_test_count" not in report:
- report["finished_test_count"] = test_count
- if report["finished_test_count"] < test_count:
- report["passed"] = False
- else:
- report["passed"] = True
- elapsed_time = time.perf_counter() - start_time
- report["cost_time"] = elapsed_time
- TEST_THREAD_MSG_QUEUE.put({thread_name: report})
- logger.info(f"stop process_file_send_test thread {thread_name}")
-
-
-def process_file_recv_test(connect_key, config, thread_name):
- """
- config格式
- {
- "test_type": TestType.FILE_RECV,
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": 0.01,
- "task_count": TASK_COUNT_DEFAULT,
- "cmd": "file recv",
- "original_file": "medium",
- "local": "medium",
- "remote": "/data/medium",
- "expected_results": "FileTransfer finish",
- }
- """
- logger.info(f"start process_file_recv_test thread {thread_name}")
- test_count = config["loop_count"]
- delay_s = config["loop_delay_s"]
- expected_results = config["expected_results"]
- original_file = os.path.join(RESOURCE_PATH, config['original_file'])
- name_id = uuid.uuid4()
- local_file = os.path.join(RESOURCE_PATH, f"{config['local']}_{name_id}")
- remote_file = f"{config['remote']}_{name_id}"
- cmd = f"hdc -t {connect_key} {config['cmd']} {remote_file} {local_file}"
- report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
- start_time = time.perf_counter()
- run_cmd_block(f"hdc -t {connect_key} file send {original_file} {remote_file}")
- for count in range(test_count):
- logger.info(f"{thread_name} {count}")
- is_ok, info = run_cmd_and_check_output(cmd, expected_results)
- if is_ok:
- logger.info(f"{thread_name} {count} success result:{info}")
- else:
- logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
- f"result:{info}")
- error_time = get_time_str(need_ms=True)
- msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
- f" can not get expect result:{expected_results}, result:{info}"
- report["error_msg"] = f"[{error_time}] result:{info}"
- TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
- EXIT_EVENT.set()
- logger.info(f"{thread_name} {count} set exit event")
- report["finished_test_count"] = count
- break
- os.remove(local_file)
- logger.debug(f"{connect_key} remove {local_file} finished")
- if EXIT_EVENT.is_set():
- logger.info(f"{thread_name} {count} exit event is set")
- report["error_msg"] = "event is set, normal exit"
- report["finished_test_count"] = count + 1
- break
- time.sleep(delay_s)
- if "finished_test_count" not in report:
- report["finished_test_count"] = test_count
- if report["finished_test_count"] < test_count:
- report["passed"] = False
- else:
- report["passed"] = True
- run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}")
- elapsed_time = time.perf_counter() - start_time
- report["cost_time"] = elapsed_time
- TEST_THREAD_MSG_QUEUE.put({thread_name: report})
- logger.info(f"stop process_file_recv_test thread {thread_name}")
-
-
-def create_fport_trans_test_env(info):
- # 构建传输通道
- thread_name = info.get("thread_name")
- connect_key = info.get("connect_key")
- fport_arg = info.get("fport_arg")
- result, _ = run_cmd_block(f"hdc -t {connect_key} fport {fport_arg}")
- logger.info(f"{thread_name} fport result:{result}")
- rport_arg = info.get("rport_arg")
- result, _ = run_cmd_block(f"hdc -t {connect_key} rport {rport_arg}")
- logger.info(f"{thread_name} rport result:{result}")
-
-
-def do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name):
- """
- 进行一次数据传输测试
- """
- logger.info(f"do_fport_trans_file_test_once start, file_save_name:{file_save_name}")
- server_thread = threading.Thread(target=server_loop, args=(server_listen_port,))
- server_thread.start()
-
- client_get_file(client_connect_port, file_name, file_save_name)
- server_thread.join()
-
- ori_file_md5 = get_local_md5(os.path.join(RESOURCE_PATH, file_name))
- new_file = os.path.join(RESOURCE_PATH, file_save_name)
- new_file_md5 = 0
- if os.path.exists(new_file):
- new_file_md5 = get_local_md5(new_file)
- os.remove(new_file)
- logger.info(f"ori_file_md5:{ori_file_md5}, new_file_md5:{new_file_md5}")
-
- if not ori_file_md5 == new_file_md5:
- logger.error(f"check file md5 failed, file_save_name:{file_save_name}, ori_file_md5:{ori_file_md5},"
- f"new_file_md5:{new_file_md5}")
- return False
- logger.info(f"do_fport_trans_file_test_once exit, file_save_name:{file_save_name}")
- return True
-
-
-def process_fport_trans_file_test(connect_key, config, thread_name, task_index):
- """
- task_index对应当前测试的port_info,从0开始计数。
- config格式
- {
- "test_type": TestType.FPORT_TRANS_FILE,
- "port_info": [
- {
- "client_connect_port": 22345,
- "daemon_transmit_port": 11081,
- "server_listen_port": 18000,
- },
- {
- "client_connect_port": 22346,
- "daemon_transmit_port": 11082,
- "server_listen_port": 18001,
- },
- ],
-
- "loop_count": LOOP_COUNT_DEFAULT,
- "loop_delay_s": 0.01,
- "original_file": "medium",
- }
- """
- logger.info(f"start process_fport_trans_file_test thread {thread_name}")
- test_count = config["loop_count"]
- delay_s = config["loop_delay_s"]
- port_info = config["port_info"]
- client_connect_port = port_info[task_index]["client_connect_port"]
- daemon_transmit_port = port_info[task_index]["daemon_transmit_port"]
- server_listen_port = port_info[task_index]["server_listen_port"]
- fport_arg = f"tcp:{client_connect_port} tcp:{daemon_transmit_port}"
- rport_arg = f"tcp:{daemon_transmit_port} tcp:{server_listen_port}"
-
- report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
- start_time = time.perf_counter()
-
- # 构建传输通道
- create_fport_trans_test_env({"thread_name": thread_name, "connect_key": connect_key,
- "fport_arg": fport_arg, "rport_arg": rport_arg})
-
- # transmit file start
- file_name = config["original_file"]
- file_save_name = f"{file_name}_recv_fport_{uuid.uuid4()}"
- for count in range(test_count):
- logger.info(f"{thread_name} {count}")
- if do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name) is False:
- error_time = get_time_str(need_ms=True)
- msg = f"[{error_time}] {thread_name} loop_count:{count+1}" \
- f" check file md5 failed"
- report["error_msg"] = f"[{error_time}] check file md5 failed"
- TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
- EXIT_EVENT.set()
- logger.info(f"{thread_name} {count} set exit event")
- report["finished_test_count"] = count
- break
- if EXIT_EVENT.is_set():
- logger.info(f"{thread_name} {count} exit event is set")
- report["error_msg"] = "event is set, normal exit"
- report["finished_test_count"] = count + 1
- break
- time.sleep(delay_s)
-
- # 关闭fport通道
- run_cmd_block(f"hdc -t {connect_key} fport rm {fport_arg}")
- run_cmd_block(f"hdc -t {connect_key} fport rm {rport_arg}")
- if "finished_test_count" not in report:
- report["finished_test_count"] = test_count
- if report["finished_test_count"] < test_count:
- report["passed"] = False
- else:
- report["passed"] = True
- elapsed_time = time.perf_counter() - start_time
- report["cost_time"] = elapsed_time
- TEST_THREAD_MSG_QUEUE.put({thread_name: report})
- logger.info(f"stop process_fport_trans_file_test thread {thread_name}")
-
-
-def get_test_result(fail_num):
- """
- 返回错误结果及错误结果显示类型信息
- """
- global TEST_RESULT
- if fail_num > 0:
- test_result = "失败"
- TEST_RESULT = False
- test_result_class = "fail"
- else:
- test_result = "通过"
- TEST_RESULT = True
- test_result_class = "pass"
- return test_result, test_result_class
-
-
-def get_error_msg_html(public_info):
- """
- 获取html格式的错误信息
- """
- error_msg_list = public_info.get(TASK_ERROR_KEY_NAME)
- error_msg_html = ''
- if error_msg_list is not None:
- error_msg = '\n'.join(error_msg_list)
- error_msg_html = f"""
-
-
- 错误信息
- {error_msg}
-
-
- """
- return error_msg_html
-
-
-def gen_report_public_info(public_info):
- """
- 生成html格式的报告公共信息部分
- """
- start_time = public_info.get('start_time')
- stop_time = public_info.get('stop_time')
- pass_num = public_info.get('pass_num')
- fail_num = public_info.get('fail_num')
- test_task_num = pass_num + fail_num
- test_result, test_result_class = get_test_result(fail_num)
-
- # 错误信息
- error_msg_html = get_error_msg_html(public_info)
-
- public_html_temp = f"""
-
-
- 用例版本号
- {STABILITY_TEST_VERSION}
-
-
- 开始时间
- {start_time}
-
-
- 结束时间
- {stop_time}
-
-
-
-
- 测试任务数
- {test_task_num}
-
-
- 失败任务数
- {fail_num}
-
-
- 成功任务数
- {pass_num}
-
-
-
-
- 测试结果
- {test_result}
-
-
- {error_msg_html}
- """
- return public_html_temp
-
-
-def gen_report_detail_info(pass_list, fail_list):
- """
- 生成html格式的报告详细信息部分
- pass_list fail_list 格式:[("测试名称", {"key": value, "key": value, "key": value, "key": value ...}), ... ]
- """
- # 生成表格每一行记录
- detail_rows_list = []
- for one_test_key, one_test_value in fail_list + pass_list:
- if one_test_value.get("passed") is True:
- result_class = "pass"
- result_text = "通过"
- else:
- result_class = "fail"
- result_text = "失败"
- finished_test_count = one_test_value.get("finished_test_count")
- loop_count = one_test_value.get("loop_count")
- completion_rate = 100.0 * finished_test_count / loop_count
- cost_time = one_test_value.get("cost_time")
- if finished_test_count > 0:
- cost_time_per_test = f"{cost_time / finished_test_count:.3f}"
- else:
- cost_time_per_test = "NA"
- one_row = f"""
-
- {one_test_key} |
- {result_text} |
- {completion_rate:.1f} |
- {finished_test_count} |
- {loop_count} |
- {cost_time_per_test} |
- {cost_time:.3f} |
- {one_test_value.get("error_msg", "NA")} |
-
- """
- detail_rows_list.append(one_row)
- detail_rows = ''.join(detail_rows_list)
-
- # 合成表格
- detail_table_temp = f"""
-
-
- 任务名称 |
- 测试结果 |
- 完成率 |
- 已完成次数 |
- 总次数 |
- 平均每轮耗时(秒) |
- 总耗时(秒) |
- 错误信息 |
-
- {detail_rows}
-
- """
- return detail_table_temp
-
-
-def gen_report_info(public_info, detail_info):
- """
- 生成html格式的报告
- """
- html_temp = f"""
-
-
- {HTML_HEAD}
-
-
-
- HDC稳定性测试报告
-
- {public_info}
-
-
-
-
-
- """
- return html_temp
-
-
-def save_report(report_info, save_file_name):
- """
- 生成测试报告
- """
- # 字典中获取报告公共信息
- public_info = report_info.get(REPORT_PUBLIC_INFO_NAME)
- # 字典中获取详细测试项目的信息
- detail_test_info = {key: value for key, value in report_info.items() if key != REPORT_PUBLIC_INFO_NAME}
-
- # 分类测试结果
- pass_list = []
- fail_list = []
- for one_test_key, one_test_value in detail_test_info.items():
- if one_test_value.get("passed") is True:
- pass_list.append((one_test_key, one_test_value))
- else:
- fail_list.append((one_test_key, one_test_value))
-
- # 排序成功和失败的任务列表
- pass_list = sorted(pass_list)
- fail_list = sorted(fail_list)
- public_info["pass_num"] = len(pass_list)
- public_info["fail_num"] = len(fail_list)
-
- # 生成报告公共信息
- public_info_html = gen_report_public_info(public_info)
-
- # 生成报告详细信息
- detail_info_html = gen_report_detail_info(pass_list, fail_list)
-
- # 合成报告
- report_info_html = gen_report_info(public_info_html, detail_info_html)
-
- # 写入文件
- with open(save_file_name, "w", encoding="utf-8") as f:
- f.write(report_info_html)
-
-
-def add_error_msg_to_public_info(err_msg, report_info):
- """
- 将一条错误信息追加到public_info里面的task_error字段中
- err_msg的结构为{"name": "", "error_msg": ""}
- """
- msg = err_msg.get("error_msg")
- if REPORT_PUBLIC_INFO_NAME in report_info:
- public_info = report_info.get(REPORT_PUBLIC_INFO_NAME)
- if TASK_ERROR_KEY_NAME in public_info:
- public_info[TASK_ERROR_KEY_NAME].append(msg)
- else:
- public_info[TASK_ERROR_KEY_NAME] = [msg, ]
- else:
- report_info[REPORT_PUBLIC_INFO_NAME] = {TASK_ERROR_KEY_NAME: [msg, ]}
-
-
-def add_to_report_info(one_info, report_info):
- """
- 将一条信息添加到报告信息集合中,
- 不存在则新增,存在则添加或者覆盖以前的值
- one_info格式为 {key: {key1: value ...}, key: {key1: value ...}...}
- 当前的key包括如下几类:
- 1、任务名称:[sn]_filerecv_0 或者 [ip:port]_filerecv_0
- 2、公共信息:REPORT_PUBLIC_INFO_NAME = "public_info"
- 3、测试任务报告的错误信息:TASK_ERROR_KEY_NAME = "task_error"
- 最终追加到public_info里面的task_error字段中
- """
- for report_section_add, section_dict_add in one_info.items():
- if report_section_add == TASK_ERROR_KEY_NAME:
- # 追加error msg到public_info里面的task_error字段中
- add_error_msg_to_public_info(section_dict_add, report_info)
- continue
- if report_section_add in report_info:
- # 报告数据中已经存在待添加的字段,则获取报告数据中的该段落,给段落中增加或者覆盖已有字段
- report_section_dict = report_info.get(report_section_add)
- report_section_dict.update(section_dict_add)
- else:
- report_info[report_section_add] = section_dict_add
-
-
-def process_msg(msg_queue, thread_name):
- """
- 消息中心,用于接收所有测试线程的消息,保存到如下结构的字典report_info中,用于生成报告:
- {
- "public_info": {"key": value, "key": value, "key": value, "key": value ...}
- "thread_name1": {"key": value, "key": value, "key": value, "key": value ...}
- "thread_name2": {"key": value, "key": value, "key": value, "key": value ...}
- }
- public_info为报告开头的公共信息
- thread_namex为各个测试项目的测试信息
- 通过msg_queue,传递过来的消息,格式为 {key: {key1: value ...}, key: {key1: value ...}...}的格式,
- key表示上面report_info的key,不存在则新增,存在则添加或者覆盖以前的值
- """
- logger.info(f"start process_msg thread {thread_name}")
- report_info = {}
- while True:
- one_info = msg_queue.get()
- if one_info is None: # 报告接收完毕,退出
- logger.info(f"{thread_name} get None from queue")
- break
- add_to_report_info(one_info, report_info)
- logger.info(f"start save report to {REPORT_FILE_NAME}, report len:{len(report_info)}")
- save_report(report_info, REPORT_FILE_NAME)
- logger.info(f"stop process_msg thread {thread_name}")
-
-
-class Session(object):
- """
- session class
- 一个session连接对应一个session class
- 包含了所有在当前连接下面的测试任务
- """
- # session类型,usb or tcp
- session_type = ""
- # 设备连接标识
- connect_key = ""
- test_config = []
- thread_list = []
-
- def __init__(self, connect_key):
- self.connect_key = connect_key
-
- def set_test_config(self, config):
- self.test_config = config
-
- def start_test(self):
- if len(self.test_config) == 0:
- logger.error(f"start test test_config is empty, do not test, type:{self.session_type} {self.connect_key}")
- return True
- for one_config in self.test_config:
- if self.start_one_test(one_config) is False:
- logger.error(f"start one test failed, type:{self.session_type} {self.connect_key} config:{one_config}")
- return False
- return True
-
- def start_one_test(self, config):
- if config["test_type"] == TestType.SHELL:
- if self.start_shell_test(config) is False:
- logger.error(f"start_shell_test failed, type:{self.session_type} {self.connect_key} config:{config}")
- return False
- if config["test_type"] == TestType.FILE_SEND:
- if self.start_file_send_test(config) is False:
- logger.error(f"start_file_send_test failed, type:{self.session_type} \
- {self.connect_key} config:{config}")
- return False
- if config["test_type"] == TestType.FILE_RECV:
- if self.start_file_recv_test(config) is False:
- logger.error(f"start_file_recv_test failed, type:{self.session_type} \
- {self.connect_key} config:{config}")
- return False
- if config["test_type"] == TestType.FPORT_TRANS_FILE:
- if self.start_fport_trans_file_test(config) is False:
- logger.error(f"start_fport_trans_file_test failed, type:{self.session_type} \
- {self.connect_key} config:{config}")
- return False
- return True
-
- def start_shell_test(self, config):
- task_count = config["task_count"]
- test_name = f"{self.connect_key}_shell"
- for task_index in range(task_count):
- thread = threading.Thread(target=process_shell_test, name=f"{test_name}_{task_index}",
- args=(self.connect_key, config, f"{test_name}_{task_index}"))
- thread.start()
- self.thread_list.append(thread)
-
- def start_file_send_test(self, config):
- task_count = config["task_count"]
- test_name = f"{self.connect_key}_filesend"
- for task_index in range(task_count):
- thread = threading.Thread(target=process_file_send_test, name=f"{test_name}_{task_index}",
- args=(self.connect_key, config, f"{test_name}_{task_index}"))
- thread.start()
- self.thread_list.append(thread)
-
- def start_file_recv_test(self, config):
- task_count = config["task_count"]
- test_name = f"{self.connect_key}_filerecv"
- for task_index in range(task_count):
- thread = threading.Thread(target=process_file_recv_test, name=f"{test_name}_{task_index}",
- args=(self.connect_key, config, f"{test_name}_{task_index}"))
- thread.start()
- self.thread_list.append(thread)
-
- def start_fport_trans_file_test(self, config):
- task_count = len(config["port_info"])
- test_name = f"{self.connect_key}_fporttrans"
- for task_index in range(task_count):
- thread = threading.Thread(target=process_fport_trans_file_test, name=f"{test_name}_{task_index}",
- args=(self.connect_key, config, f"{test_name}_{task_index}", task_index))
- thread.start()
- self.thread_list.append(thread)
-
-
-class UsbSession(Session):
- """
- usb连接对应的session类
- """
- session_type = "usb"
-
-
-class TcpSession(Session):
- """
- tcp连接对应的session类
- """
- session_type = "tcp"
-
- def start_tcp_connect(self):
- result, _ = run_cmd_block(f"hdc tconn {self.connect_key}")
- logger.info(f"start_tcp_connect {self.connect_key} result:{result}")
- RELEASE_TCONN_CMD_LIST.append(f"hdc tconn {self.connect_key} -remove")
- return True
-
-
-def start_server():
- cmd = "hdc start"
- result, _ = run_cmd_block(cmd)
- return result
-
-
-def get_dev_list():
- try:
- result, _ = run_cmd_block("hdc list targets")
- result = result.split()
- except (OSError, IndexError):
- result = ["failed to detect device"]
- return False, result
- targets = result
- if len(targets) == 0:
- logger.error(f"get device, devices list is empty")
- return False, []
- if "[Empty]" in targets[0]:
- logger.error(f"get device, no devices found, devices:{targets}")
- return False, targets
- return True, targets
-
-
-def check_device_online(connect_key):
- """
- 确认设备是否在线
- """
- result, devices = get_dev_list()
- if result is False:
- logger.error(f"get device failed, devices:{devices}")
- return False
- if connect_key in devices:
- return True
- return False
-
-
-def init_test_env():
- """
- 初始化测试环境
- 1、使用tmode port和fport转发,构建多session tcp测试场景
- """
- logger.info(f"init env")
- start_server()
- result, devices = get_dev_list()
- if result is False:
- logger.error(f"get device failed, devices:{devices}")
- return False
- device_sn = devices[0]
- if len(devices) > 1:
- # 存在多个设备连接,获取不是ip:port的设备作为待测试的设备
- logger.info(f"Multiple devices are connected, devices:{devices}")
- for dev in devices:
- if ':' not in dev:
- device_sn = dev
- # 关闭设备侧监听端口
- result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port close")
- logger.info(f"close tmode port finished")
- time.sleep(10)
-
- logger.info(f"get device:{device_sn}")
-
- # 开启设备侧监听端口
- result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port {DEVICE_LISTEN_PORT}")
- logger.info(f"run tmode port {DEVICE_LISTEN_PORT}, result:{result}")
- time.sleep(10)
- logger.info(f"start check device:{device_sn}")
- if check_device_online(device_sn) is False:
- logger.error(f"device {device_sn} in not online")
- return False
- logger.info(f"init_test_env finished")
- return True
-
-
-def start_usb_session_test(connect_key):
- """
- 开始一个usb session的测试
- """
- logger.info(f"start_usb_session_test connect_key:{connect_key}")
- session = UsbSession(connect_key)
- session.set_test_config(USB_SESSION_TEST_CONFIG)
- if session.start_test() is False:
- logger.error(f"session.start_test failed, connect_key:{connect_key}")
- return False
- SESSION_LIST.append(session)
- logger.info(f"start_usb_session_test connect_key:{connect_key} finished")
- return True
-
-
-def start_local_tcp_session_test(connect_key, port_list):
- """
- 1、遍历传入的端口号,创建fport端口转发到设备端的DEVICE_LISTEN_PORT,
- 2、使用tconn连接后进行测试
- """
- logger.info(f"start_local_tcp_session_test port_list:{port_list}")
- if len(TCP_SESSION_TEST_CONFIG) == 0 or len(port_list) == 0:
- logger.info(f"port_list:{port_list} TCP_SESSION_TEST_CONFIG:{TCP_SESSION_TEST_CONFIG}")
- logger.info(f"port_list or TCP_SESSION_TEST_CONFIG is empty, no need do local tcp session test, exit")
- return True
- for port in port_list:
- # 构建fport通道
- logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT}")
- result, _ = run_cmd_block(f"hdc -t {connect_key} fport tcp:{port} tcp:{DEVICE_LISTEN_PORT}")
- logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT} result:{result}")
- RELEASE_FPORT_CMD_LIST.append(f"hdc -t {connect_key} fport rm tcp:{port} tcp:{DEVICE_LISTEN_PORT}")
- tcp_key = f"127.0.0.1:{port}"
- logger.info(f"start TcpSession connect_key:{tcp_key}")
- session = TcpSession(tcp_key)
- session.set_test_config(TCP_SESSION_TEST_CONFIG)
- session.start_tcp_connect()
- if session.start_test() is False:
- logger.error(f"session.start_test failed, connect_key:{tcp_key}")
- return False
- SESSION_LIST.append(session)
- logger.info(f"start tcp test connect_key:{tcp_key} finished")
-
- logger.info(f"start_local_tcp_session_test finished")
- return True
-
-
-def start_test(msg_queue):
- """
- 启动测试
- 1、启动usb session场景的测试
- 2、启动通过 tmode+fport模拟的tcp session场景的测试
- """
- logger.info(f"start test")
- result, devices = get_dev_list()
- if result is False:
- logger.error(f"get device failed, devices:{devices}")
- return False
- device_sn = devices[0]
- DEVICE_LIST.append(device_sn)
- if start_usb_session_test(device_sn) is False:
- logger.error(f"start_usb_session_test failed, devices:{devices}")
- return False
- if start_local_tcp_session_test(device_sn, MUTIL_SESSION_TEST_PC_PORT_LIST) is False:
- logger.error(f"start_tcp_session_test failed, devices:{devices}")
- return False
- return True
-
-
-def release_resource():
- """
- 释放相关资源
- 1、断开tconn连接
- 2、fport转发
- 3、tmode port
- """
- logger.info(f"enter release_resource")
- for cmd in RELEASE_TCONN_CMD_LIST:
- result, _ = run_cmd_block(cmd)
- logger.info(f"release tconn cmd:{cmd} result:{result}")
- for cmd in RELEASE_FPORT_CMD_LIST:
- result, _ = run_cmd_block(cmd)
- logger.info(f"release fport cmd:{cmd} result:{result}")
- result, _ = run_cmd_block(f"hdc -t {DEVICE_LIST[0]} tmode port close")
- logger.info(f"tmode port close, device:{DEVICE_LIST[0]} result:{result}")
-
-
-def run_stability_test():
- global logger
- logger = logging.getLogger(__name__)
- logger.info(f"main resource_path:{RESOURCE_PATH}")
- start_time = get_time_str()
- if init_test_env() is False:
- logger.error(f"init_test_env failed")
- return False
-
- # 启动消息收集进程
- msg_thread = threading.Thread(target=process_msg, name="msg_process", args=(TEST_THREAD_MSG_QUEUE, "msg_process"))
- msg_thread.start()
-
- put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"start_time": start_time}})
- logger.info(f"start run test thread")
- if start_test(TEST_THREAD_MSG_QUEUE) is False:
- logger.error(f"start_test failed")
- stop_time = get_time_str()
- put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}})
- TEST_THREAD_MSG_QUEUE.put(None)
- msg_thread.join()
- return False
-
- # wait all thread exit
- logger.info(f"wait all test thread exit")
- for session in SESSION_LIST:
- for thread in session.thread_list:
- thread.join()
-
- stop_time = get_time_str()
- put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}})
-
- release_resource()
-
- # 传递 None 参数,报告进程收到后退出
- logger.info(f"main put None to TEST_THREAD_MSG_QUEUE")
- TEST_THREAD_MSG_QUEUE.put(None)
- msg_thread.join()
-
- logger.info(f"exit main, test result:{TEST_RESULT}")
- return TEST_RESULT
-
-
-if __name__ == "__main__":
- init_logger()
- run_stability_test()
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Copyright (C) 2025 Huawei Device Co., Ltd.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+运行环境: python 3.10+
+测试方法:
+方法1、使用pytest框架执行
+请见测试用例根目录test\scripts下readme.md中的执行稳定性用例小节
+方法2、直接执行
+进入test\scripts目录,执行python .\testModule\test_hdc_stability_utils.py
+
+测试范围
+hdc shell
+hdc file send/recv
+hdc fport 文件传输
+多session场景测试(hdc tmode + hdc tconn)
+
+注意:
+1、当前仅支持一台设备的压力测试
+2、如果异常退出,请执行 hdc -t [usb_connect_key] tmode port close,关闭网络接口通道
+执行 hdc list targets查看是否还有网络连接,如果有请执行hdc tconn 127.0.0.1:port -remove断开
+执行hdc fport ls查看是否还有端口转发的规则,如果有请执行hdc fport rm tcp:xxxxx tcp:9999删除转发
+"""
+
+import subprocess
+import os
+import logging
+import logging.config
+import time
+from datetime import datetime
+from enum import Enum
+import threading
+import uuid
+import hashlib
+import queue
+from utils import server_loop
+from utils import client_get_file
+from utils import get_local_md5
+
+STABILITY_TEST_VERSION = "v1.0.0"
+
+TIME_REPORT_FORMAT = '%Y-%m-%d %H:%M:%S'
+TIME_FILE_FORMAT = '%Y%m%d_%H%M%S'
+TEST_RESULT = False
+
+WORK_DIR = os.getcwd()
+
+# 消息队列中使用到的key名称
+# 报告公共信息key名称
+REPORT_PUBLIC_INFO_NAME = "public_info"
+# 报告公共信息中的错误信息key名称
+TASK_ERROR_KEY_NAME = "task_error"
+
+# 资源文件相对路径
+RESOURCE_RELATIVE_PATH = "resource"
+# 报告文件相对路径
+REPORTS_RELATIVE_PATH = "reports"
+# 缓存查询到的设备sn
+DEVICE_LIST = []
+# 记录当前启动的session类信息
+SESSION_LIST = []
+# 释放资源命令列表
+RELEASE_TCONN_CMD_LIST = []
+RELEASE_FPORT_CMD_LIST = []
+
+RESOURCE_PATH = os.path.join(WORK_DIR, RESOURCE_RELATIVE_PATH)
+
+EXIT_EVENT = threading.Event()
+
+TEST_THREAD_MSG_QUEUE = queue.Queue()
+
+
+def get_time_str(fmt=TIME_REPORT_FORMAT, need_ms=False):
+ now_time = datetime.now()
+ if need_ms is False:
+ return now_time.strftime(fmt)
+ else:
+ return f"{now_time.strftime(fmt)}.{now_time.microsecond // 1000:03d}"
+
+
+TEST_FILE_TIME_STR = get_time_str(TIME_FILE_FORMAT)
+LOG_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_{TEST_FILE_TIME_STR}.log")
+REPORT_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_report_{TEST_FILE_TIME_STR}.html")
+logger = None
+
+# 配置日志记录
+logging_config_info = {
+ 'version': 1,
+ 'formatters': {
+ 'format1': {
+ 'format': '[%(asctime)s %(name)s %(funcName)s %(lineno)d %(levelname)s][%(process)d][%(thread)d]'
+ '[%(threadName)s]%(message)s'
+ },
+ },
+ 'handlers': {
+ 'console_handle': {
+ 'level': 'DEBUG',
+ 'formatter': 'format1',
+ 'class': 'logging.StreamHandler',
+ 'stream': 'ext://sys.stdout',
+ },
+ 'file_handle': {
+ 'level': 'DEBUG',
+ 'formatter': 'format1',
+ 'class': 'logging.FileHandler',
+ 'filename': LOG_FILE_NAME,
+ },
+ },
+ 'loggers': {
+ '': {
+ 'handlers': ['console_handle', 'file_handle'],
+ 'level': 'DEBUG',
+ },
+ },
+}
+
+
+class TestType(Enum):
+ SHELL = 1
+ FILE_SEND = 2
+ FILE_RECV = 3
+ FPORT_TRANS_FILE = 4
+
+
+# 当前测试任务并行运行的数量
+TASK_COUNT_DEFAULT = 5
+# 循环测试次数
+LOOP_COUNT_DEFAULT = 20
+# 每个循环结束的等待时间,单位秒
+LOOP_DELAY_S_DEFAULT = 0.01
+
+# 进行多session测试时,启动多个端口转发到设备侧的tmode监听的端口,电脑端开启的端口列表
+# 参考端口配置:12345, 12346, 12347, 12348, 12349,需要几个session,可以复制几个放入下面的MUTIL_SESSION_TEST_PC_PORT_LIST中
+MUTIL_SESSION_TEST_PC_PORT_LIST = []
+# 多session测试, 通过tmode命令,设备端切换tcp模式,监听的端口号
+DEVICE_LISTEN_PORT = 9999
+
+# usb session 测试项配置
+"""
+配置项包括如下:
+ {
+ "test_type": TestType.SHELL,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "shell ls",
+ "expected_results": "data",
+ },
+ {
+ "test_type": TestType.FILE_SEND,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file send",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+ {
+ "test_type": TestType.FILE_RECV,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file recv",
+ "original_file": "medium",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+ {
+ "test_type": TestType.FPORT_TRANS_FILE,
+ "port_info": [
+ {
+ "client_connect_port": 22345,
+ "daemon_transmit_port": 11081,
+ "server_listen_port": 18000,
+ },
+ {
+ "client_connect_port": 22346,
+ "daemon_transmit_port": 11082,
+ "server_listen_port": 18001,
+ },
+ ],
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "original_file": "medium",
+ },
+"""
+USB_SESSION_TEST_CONFIG = [
+ {
+ "test_type": TestType.SHELL,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "shell ls",
+ "expected_results": "data",
+ },
+ {
+ "test_type": TestType.FILE_SEND,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file send",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+ {
+ "test_type": TestType.FILE_RECV,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file recv",
+ "original_file": "medium",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+]
+
+# tcp session 测试项配置
+"""
+配置项包括如下:
+ {
+ "test_type": TestType.SHELL,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "shell ls",
+ "expected_results": "data",
+ },
+ {
+ "test_type": TestType.FILE_SEND,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file send",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+ {
+ "test_type": TestType.FILE_RECV,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file recv",
+ "original_file": "medium",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+"""
+TCP_SESSION_TEST_CONFIG = [
+ {
+ "test_type": TestType.SHELL,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "shell ls",
+ "expected_results": "data",
+ },
+ {
+ "test_type": TestType.FILE_SEND,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file send",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+ {
+ "test_type": TestType.FILE_RECV,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": LOOP_DELAY_S_DEFAULT,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file recv",
+ "original_file": "medium",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ },
+]
+
+HTML_HEAD = """
+
+
+HDC稳定性测试报告
+
+
+"""
+
+
+def init_logger():
+ logging.config.dictConfig(logging_config_info)
+
+
+def put_msg(queue_obj, info):
+ """
+ 给队列中填入测试报告信息,用于汇总报告和生成结果
+ """
+ queue_obj.put(info)
+
+
+def run_cmd_block(command, timeout=600):
+ logger.info(f"cmd: {command}")
+ # 启动子进程
+ process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+
+ # 用于存储子进程的输出
+ output = ""
+ error = ""
+
+ try:
+ # 读取子进程的输出
+ output, error = process.communicate(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ logger.info(f"run cmd:{command} timeout")
+ process.terminate()
+ process.kill()
+ output, error = process.communicate(timeout=timeout)
+ return output, error
+
+
+def run_cmd_and_check_output(command, want_str, timeout=600):
+ """
+ 阻塞的方式运行命令,然后判断标准输出的返回值中是否有预期的字符串
+ 存在预期的字符串返回 True
+ 不存在预期的字符串返回 False
+ """
+ output, error = run_cmd_block(command, timeout)
+ if want_str in output:
+ return True, (output, error)
+ else:
+ logger.error(f"can not get expect str:{want_str} cmd:{command} output:{output} error:{error}")
+ return False, (output, error)
+
+
+def process_shell_test(connect_key, config, thread_name):
+ """
+ config格式
+ {
+ "test_type": TestType.SHELL,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": 0.01,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "shell ls",
+ "expected_results": "data",
+ }
+ """
+ logger.info(f"start process_shell_test thread {thread_name}")
+ test_count = config["loop_count"]
+ delay_s = config["loop_delay_s"]
+ cmd = f"hdc -t {connect_key} {config['cmd']}"
+ expected_results = config["expected_results"]
+ report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
+ start_time = time.perf_counter()
+ for count in range(test_count):
+ logger.info(f"{thread_name} {count}")
+ is_ok, info = run_cmd_and_check_output(cmd, expected_results)
+ if is_ok:
+ logger.info(f"{thread_name} {count} success result:{info}")
+ else:
+ logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
+ f"result:{info}")
+ error_time = get_time_str(need_ms=True)
+ msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
+ f" can not get expect result:{expected_results}, result:{info}"
+ report["error_msg"] = f"[{error_time}] result:{info}"
+ TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
+ EXIT_EVENT.set()
+ logger.info(f"{thread_name} {count} set exit event")
+ report["finished_test_count"] = count
+ break
+ if EXIT_EVENT.is_set():
+ logger.info(f"{thread_name} {count} exit event is set")
+ report["error_msg"] = "event is set, normal exit"
+ report["finished_test_count"] = count + 1
+ break
+ time.sleep(delay_s)
+ if "finished_test_count" not in report:
+ report["finished_test_count"] = test_count
+ if report["finished_test_count"] < test_count:
+ report["passed"] = False
+ else:
+ report["passed"] = True
+ elapsed_time = time.perf_counter() - start_time
+ report["cost_time"] = elapsed_time
+ TEST_THREAD_MSG_QUEUE.put({thread_name: report})
+ logger.info(f"stop process_shell_test thread {thread_name}")
+
+
+def process_file_send_test(connect_key, config, thread_name):
+ """
+ config格式
+ {
+ "test_type": TestType.FILE_SEND,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": 0.01,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file send",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ }
+ """
+ logger.info(f"start process_file_send_test thread {thread_name}")
+ test_count = config["loop_count"]
+ delay_s = config["loop_delay_s"]
+ expected_results = config["expected_results"]
+ local_file = os.path.join(RESOURCE_PATH, config['local'])
+ remote_file = f"{config['remote']}_{uuid.uuid4()}"
+ cmd = f"hdc -t {connect_key} {config['cmd']} {local_file} {remote_file}"
+ report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
+ start_time = time.perf_counter()
+ for count in range(test_count):
+ logger.info(f"{thread_name} {count}")
+ is_ok, info = run_cmd_and_check_output(cmd, expected_results)
+ if is_ok:
+ logger.info(f"{thread_name} {count} success result:{info}")
+ else:
+ logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
+ f"result:{info}")
+ error_time = get_time_str(need_ms=True)
+ msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
+ f" can not get expect result:{expected_results}, result:{info}"
+ report["error_msg"] = f"[{error_time}] result:{info}"
+ TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
+ EXIT_EVENT.set()
+ logger.info(f"{thread_name} {count} set exit event")
+ report["finished_test_count"] = count
+ break
+
+ run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}")
+ if EXIT_EVENT.is_set():
+ logger.info(f"{thread_name} {count} exit event is set")
+ report["error_msg"] = "event is set, normal exit"
+ report["finished_test_count"] = count + 1
+ break
+ time.sleep(delay_s)
+ if "finished_test_count" not in report:
+ report["finished_test_count"] = test_count
+ if report["finished_test_count"] < test_count:
+ report["passed"] = False
+ else:
+ report["passed"] = True
+ elapsed_time = time.perf_counter() - start_time
+ report["cost_time"] = elapsed_time
+ TEST_THREAD_MSG_QUEUE.put({thread_name: report})
+ logger.info(f"stop process_file_send_test thread {thread_name}")
+
+
+def process_file_recv_test(connect_key, config, thread_name):
+ """
+ config格式
+ {
+ "test_type": TestType.FILE_RECV,
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": 0.01,
+ "task_count": TASK_COUNT_DEFAULT,
+ "cmd": "file recv",
+ "original_file": "medium",
+ "local": "medium",
+ "remote": "/data/medium",
+ "expected_results": "FileTransfer finish",
+ }
+ """
+ logger.info(f"start process_file_recv_test thread {thread_name}")
+ test_count = config["loop_count"]
+ delay_s = config["loop_delay_s"]
+ expected_results = config["expected_results"]
+ original_file = os.path.join(RESOURCE_PATH, config['original_file'])
+ name_id = uuid.uuid4()
+ local_file = os.path.join(RESOURCE_PATH, f"{config['local']}_{name_id}")
+ remote_file = f"{config['remote']}_{name_id}"
+ cmd = f"hdc -t {connect_key} {config['cmd']} {remote_file} {local_file}"
+ report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
+ start_time = time.perf_counter()
+ run_cmd_block(f"hdc -t {connect_key} file send {original_file} {remote_file}")
+ for count in range(test_count):
+ logger.info(f"{thread_name} {count}")
+ is_ok, info = run_cmd_and_check_output(cmd, expected_results)
+ if is_ok:
+ logger.info(f"{thread_name} {count} success result:{info}")
+ else:
+ logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results},"
+ f"result:{info}")
+ error_time = get_time_str(need_ms=True)
+ msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \
+ f" can not get expect result:{expected_results}, result:{info}"
+ report["error_msg"] = f"[{error_time}] result:{info}"
+ TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
+ EXIT_EVENT.set()
+ logger.info(f"{thread_name} {count} set exit event")
+ report["finished_test_count"] = count
+ break
+ os.remove(local_file)
+ logger.debug(f"{connect_key} remove {local_file} finished")
+ if EXIT_EVENT.is_set():
+ logger.info(f"{thread_name} {count} exit event is set")
+ report["error_msg"] = "event is set, normal exit"
+ report["finished_test_count"] = count + 1
+ break
+ time.sleep(delay_s)
+ if "finished_test_count" not in report:
+ report["finished_test_count"] = test_count
+ if report["finished_test_count"] < test_count:
+ report["passed"] = False
+ else:
+ report["passed"] = True
+ run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}")
+ elapsed_time = time.perf_counter() - start_time
+ report["cost_time"] = elapsed_time
+ TEST_THREAD_MSG_QUEUE.put({thread_name: report})
+ logger.info(f"stop process_file_recv_test thread {thread_name}")
+
+
+def create_fport_trans_test_env(info):
+ # 构建传输通道
+ thread_name = info.get("thread_name")
+ connect_key = info.get("connect_key")
+ fport_arg = info.get("fport_arg")
+ result, _ = run_cmd_block(f"hdc -t {connect_key} fport {fport_arg}")
+ logger.info(f"{thread_name} fport result:{result}")
+ rport_arg = info.get("rport_arg")
+ result, _ = run_cmd_block(f"hdc -t {connect_key} rport {rport_arg}")
+ logger.info(f"{thread_name} rport result:{result}")
+
+
+def do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name):
+ """
+ 进行一次数据传输测试
+ """
+ logger.info(f"do_fport_trans_file_test_once start, file_save_name:{file_save_name}")
+ server_thread = threading.Thread(target=server_loop, args=(server_listen_port,))
+ server_thread.start()
+
+ client_get_file(client_connect_port, file_name, file_save_name)
+ server_thread.join()
+
+ ori_file_md5 = get_local_md5(os.path.join(RESOURCE_PATH, file_name))
+ new_file = os.path.join(RESOURCE_PATH, file_save_name)
+ new_file_md5 = 0
+ if os.path.exists(new_file):
+ new_file_md5 = get_local_md5(new_file)
+ os.remove(new_file)
+ logger.info(f"ori_file_md5:{ori_file_md5}, new_file_md5:{new_file_md5}")
+
+ if not ori_file_md5 == new_file_md5:
+ logger.error(f"check file md5 failed, file_save_name:{file_save_name}, ori_file_md5:{ori_file_md5},"
+ f"new_file_md5:{new_file_md5}")
+ return False
+ logger.info(f"do_fport_trans_file_test_once exit, file_save_name:{file_save_name}")
+ return True
+
+
+def process_fport_trans_file_test(connect_key, config, thread_name, task_index):
+ """
+ task_index对应当前测试的port_info,从0开始计数。
+ config格式
+ {
+ "test_type": TestType.FPORT_TRANS_FILE,
+ "port_info": [
+ {
+ "client_connect_port": 22345,
+ "daemon_transmit_port": 11081,
+ "server_listen_port": 18000,
+ },
+ {
+ "client_connect_port": 22346,
+ "daemon_transmit_port": 11082,
+ "server_listen_port": 18001,
+ },
+ ],
+
+ "loop_count": LOOP_COUNT_DEFAULT,
+ "loop_delay_s": 0.01,
+ "original_file": "medium",
+ }
+ """
+ logger.info(f"start process_fport_trans_file_test thread {thread_name}")
+ test_count = config["loop_count"]
+ delay_s = config["loop_delay_s"]
+ port_info = config["port_info"]
+ client_connect_port = port_info[task_index]["client_connect_port"]
+ daemon_transmit_port = port_info[task_index]["daemon_transmit_port"]
+ server_listen_port = port_info[task_index]["server_listen_port"]
+ fport_arg = f"tcp:{client_connect_port} tcp:{daemon_transmit_port}"
+ rport_arg = f"tcp:{daemon_transmit_port} tcp:{server_listen_port}"
+
+ report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count}
+ start_time = time.perf_counter()
+
+ # 构建传输通道
+ create_fport_trans_test_env({"thread_name": thread_name, "connect_key": connect_key,
+ "fport_arg": fport_arg, "rport_arg": rport_arg})
+
+ # transmit file start
+ file_name = config["original_file"]
+ file_save_name = f"{file_name}_recv_fport_{uuid.uuid4()}"
+ for count in range(test_count):
+ logger.info(f"{thread_name} {count}")
+ if do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name) is False:
+ error_time = get_time_str(need_ms=True)
+ msg = f"[{error_time}] {thread_name} loop_count:{count+1}" \
+ f" check file md5 failed"
+ report["error_msg"] = f"[{error_time}] check file md5 failed"
+ TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}})
+ EXIT_EVENT.set()
+ logger.info(f"{thread_name} {count} set exit event")
+ report["finished_test_count"] = count
+ break
+ if EXIT_EVENT.is_set():
+ logger.info(f"{thread_name} {count} exit event is set")
+ report["error_msg"] = "event is set, normal exit"
+ report["finished_test_count"] = count + 1
+ break
+ time.sleep(delay_s)
+
+ # 关闭fport通道
+ run_cmd_block(f"hdc -t {connect_key} fport rm {fport_arg}")
+ run_cmd_block(f"hdc -t {connect_key} fport rm {rport_arg}")
+ if "finished_test_count" not in report:
+ report["finished_test_count"] = test_count
+ if report["finished_test_count"] < test_count:
+ report["passed"] = False
+ else:
+ report["passed"] = True
+ elapsed_time = time.perf_counter() - start_time
+ report["cost_time"] = elapsed_time
+ TEST_THREAD_MSG_QUEUE.put({thread_name: report})
+ logger.info(f"stop process_fport_trans_file_test thread {thread_name}")
+
+
+def get_test_result(fail_num):
+ """
+ 返回错误结果及错误结果显示类型信息
+ """
+ global TEST_RESULT
+ if fail_num > 0:
+ test_result = "失败"
+ TEST_RESULT = False
+ test_result_class = "fail"
+ else:
+ test_result = "通过"
+ TEST_RESULT = True
+ test_result_class = "pass"
+ return test_result, test_result_class
+
+
+def get_error_msg_html(public_info):
+ """
+ 获取html格式的错误信息
+ """
+ error_msg_list = public_info.get(TASK_ERROR_KEY_NAME)
+ error_msg_html = ''
+ if error_msg_list is not None:
+ error_msg = '\n'.join(error_msg_list)
+ error_msg_html = f"""
+
+
+ 错误信息
+ {error_msg}
+
+
+ """
+ return error_msg_html
+
+
+def gen_report_public_info(public_info):
+ """
+ 生成html格式的报告公共信息部分
+ """
+ start_time = public_info.get('start_time')
+ stop_time = public_info.get('stop_time')
+ pass_num = public_info.get('pass_num')
+ fail_num = public_info.get('fail_num')
+ test_task_num = pass_num + fail_num
+ test_result, test_result_class = get_test_result(fail_num)
+
+ # 错误信息
+ error_msg_html = get_error_msg_html(public_info)
+
+ public_html_temp = f"""
+
+
+ 用例版本号
+ {STABILITY_TEST_VERSION}
+
+
+ 开始时间
+ {start_time}
+
+
+ 结束时间
+ {stop_time}
+
+
+
+
+ 测试任务数
+ {test_task_num}
+
+
+ 失败任务数
+ {fail_num}
+
+
+ 成功任务数
+ {pass_num}
+
+
+
+
+ 测试结果
+ {test_result}
+
+
+ {error_msg_html}
+ """
+ return public_html_temp
+
+
+def gen_report_detail_info(pass_list, fail_list):
+ """
+ 生成html格式的报告详细信息部分
+ pass_list fail_list 格式:[("测试名称", {"key": value, "key": value, "key": value, "key": value ...}), ... ]
+ """
+ # 生成表格每一行记录
+ detail_rows_list = []
+ for one_test_key, one_test_value in fail_list + pass_list:
+ if one_test_value.get("passed") is True:
+ result_class = "pass"
+ result_text = "通过"
+ else:
+ result_class = "fail"
+ result_text = "失败"
+ finished_test_count = one_test_value.get("finished_test_count")
+ loop_count = one_test_value.get("loop_count")
+ completion_rate = 100.0 * finished_test_count / loop_count
+ cost_time = one_test_value.get("cost_time")
+ if finished_test_count > 0:
+ cost_time_per_test = f"{cost_time / finished_test_count:.3f}"
+ else:
+ cost_time_per_test = "NA"
+ one_row = f"""
+
+ {one_test_key} |
+ {result_text} |
+ {completion_rate:.1f} |
+ {finished_test_count} |
+ {loop_count} |
+ {cost_time_per_test} |
+ {cost_time:.3f} |
+ {one_test_value.get("error_msg", "NA")} |
+
+ """
+ detail_rows_list.append(one_row)
+ detail_rows = ''.join(detail_rows_list)
+
+ # 合成表格
+ detail_table_temp = f"""
+
+
+ 任务名称 |
+ 测试结果 |
+ 完成率 |
+ 已完成次数 |
+ 总次数 |
+ 平均每轮耗时(秒) |
+ 总耗时(秒) |
+ 错误信息 |
+
+ {detail_rows}
+
+ """
+ return detail_table_temp
+
+
+def gen_report_info(public_info, detail_info):
+ """
+ 生成html格式的报告
+ """
+ html_temp = f"""
+
+
+ {HTML_HEAD}
+
+
+
+ HDC稳定性测试报告
+
+ {public_info}
+
+
+
+
+
+ """
+ return html_temp
+
+
+def save_report(report_info, save_file_name):
+ """
+ 生成测试报告
+ """
+ # 字典中获取报告公共信息
+ public_info = report_info.get(REPORT_PUBLIC_INFO_NAME)
+ # 字典中获取详细测试项目的信息
+ detail_test_info = {key: value for key, value in report_info.items() if key != REPORT_PUBLIC_INFO_NAME}
+
+ # 分类测试结果
+ pass_list = []
+ fail_list = []
+ for one_test_key, one_test_value in detail_test_info.items():
+ if one_test_value.get("passed") is True:
+ pass_list.append((one_test_key, one_test_value))
+ else:
+ fail_list.append((one_test_key, one_test_value))
+
+ # 排序成功和失败的任务列表
+ pass_list = sorted(pass_list)
+ fail_list = sorted(fail_list)
+ public_info["pass_num"] = len(pass_list)
+ public_info["fail_num"] = len(fail_list)
+
+ # 生成报告公共信息
+ public_info_html = gen_report_public_info(public_info)
+
+ # 生成报告详细信息
+ detail_info_html = gen_report_detail_info(pass_list, fail_list)
+
+ # 合成报告
+ report_info_html = gen_report_info(public_info_html, detail_info_html)
+
+ # 写入文件
+ with open(save_file_name, "w", encoding="utf-8") as f:
+ f.write(report_info_html)
+
+
+def add_error_msg_to_public_info(err_msg, report_info):
+ """
+ 将一条错误信息追加到public_info里面的task_error字段中
+ err_msg的结构为{"name": "", "error_msg": ""}
+ """
+ msg = err_msg.get("error_msg")
+ if REPORT_PUBLIC_INFO_NAME in report_info:
+ public_info = report_info.get(REPORT_PUBLIC_INFO_NAME)
+ if TASK_ERROR_KEY_NAME in public_info:
+ public_info[TASK_ERROR_KEY_NAME].append(msg)
+ else:
+ public_info[TASK_ERROR_KEY_NAME] = [msg, ]
+ else:
+ report_info[REPORT_PUBLIC_INFO_NAME] = {TASK_ERROR_KEY_NAME: [msg, ]}
+
+
+def add_to_report_info(one_info, report_info):
+ """
+ 将一条信息添加到报告信息集合中,
+ 不存在则新增,存在则添加或者覆盖以前的值
+ one_info格式为 {key: {key1: value ...}, key: {key1: value ...}...}
+ 当前的key包括如下几类:
+ 1、任务名称:[sn]_filerecv_0 或者 [ip:port]_filerecv_0
+ 2、公共信息:REPORT_PUBLIC_INFO_NAME = "public_info"
+ 3、测试任务报告的错误信息:TASK_ERROR_KEY_NAME = "task_error"
+ 最终追加到public_info里面的task_error字段中
+ """
+ for report_section_add, section_dict_add in one_info.items():
+ if report_section_add == TASK_ERROR_KEY_NAME:
+ # 追加error msg到public_info里面的task_error字段中
+ add_error_msg_to_public_info(section_dict_add, report_info)
+ continue
+ if report_section_add in report_info:
+ # 报告数据中已经存在待添加的字段,则获取报告数据中的该段落,给段落中增加或者覆盖已有字段
+ report_section_dict = report_info.get(report_section_add)
+ report_section_dict.update(section_dict_add)
+ else:
+ report_info[report_section_add] = section_dict_add
+
+
+def process_msg(msg_queue, thread_name):
+ """
+ 消息中心,用于接收所有测试线程的消息,保存到如下结构的字典report_info中,用于生成报告:
+ {
+ "public_info": {"key": value, "key": value, "key": value, "key": value ...}
+ "thread_name1": {"key": value, "key": value, "key": value, "key": value ...}
+ "thread_name2": {"key": value, "key": value, "key": value, "key": value ...}
+ }
+ public_info为报告开头的公共信息
+ thread_namex为各个测试项目的测试信息
+ 通过msg_queue,传递过来的消息,格式为 {key: {key1: value ...}, key: {key1: value ...}...}的格式,
+ key表示上面report_info的key,不存在则新增,存在则添加或者覆盖以前的值
+ """
+ logger.info(f"start process_msg thread {thread_name}")
+ report_info = {}
+ while True:
+ one_info = msg_queue.get()
+ if one_info is None: # 报告接收完毕,退出
+ logger.info(f"{thread_name} get None from queue")
+ break
+ add_to_report_info(one_info, report_info)
+ logger.info(f"start save report to {REPORT_FILE_NAME}, report len:{len(report_info)}")
+ save_report(report_info, REPORT_FILE_NAME)
+ logger.info(f"stop process_msg thread {thread_name}")
+
+
+class Session(object):
+ """
+ session class
+ 一个session连接对应一个session class
+ 包含了所有在当前连接下面的测试任务
+ """
+ # session类型,usb or tcp
+ session_type = ""
+ # 设备连接标识
+ connect_key = ""
+ test_config = []
+ thread_list = []
+
+ def __init__(self, connect_key):
+ self.connect_key = connect_key
+
+ def set_test_config(self, config):
+ self.test_config = config
+
+ def start_test(self):
+ if len(self.test_config) == 0:
+ logger.error(f"start test test_config is empty, do not test, type:{self.session_type} {self.connect_key}")
+ return True
+ for one_config in self.test_config:
+ if self.start_one_test(one_config) is False:
+ logger.error(f"start one test failed, type:{self.session_type} {self.connect_key} config:{one_config}")
+ return False
+ return True
+
+ def start_one_test(self, config):
+ if config["test_type"] == TestType.SHELL:
+ if self.start_shell_test(config) is False:
+ logger.error(f"start_shell_test failed, type:{self.session_type} {self.connect_key} config:{config}")
+ return False
+ if config["test_type"] == TestType.FILE_SEND:
+ if self.start_file_send_test(config) is False:
+ logger.error(f"start_file_send_test failed, type:{self.session_type} \
+ {self.connect_key} config:{config}")
+ return False
+ if config["test_type"] == TestType.FILE_RECV:
+ if self.start_file_recv_test(config) is False:
+ logger.error(f"start_file_recv_test failed, type:{self.session_type} \
+ {self.connect_key} config:{config}")
+ return False
+ if config["test_type"] == TestType.FPORT_TRANS_FILE:
+ if self.start_fport_trans_file_test(config) is False:
+ logger.error(f"start_fport_trans_file_test failed, type:{self.session_type} \
+ {self.connect_key} config:{config}")
+ return False
+ return True
+
+ def start_shell_test(self, config):
+ task_count = config["task_count"]
+ test_name = f"{self.connect_key}_shell"
+ for task_index in range(task_count):
+ thread = threading.Thread(target=process_shell_test, name=f"{test_name}_{task_index}",
+ args=(self.connect_key, config, f"{test_name}_{task_index}"))
+ thread.start()
+ self.thread_list.append(thread)
+
+ def start_file_send_test(self, config):
+ task_count = config["task_count"]
+ test_name = f"{self.connect_key}_filesend"
+ for task_index in range(task_count):
+ thread = threading.Thread(target=process_file_send_test, name=f"{test_name}_{task_index}",
+ args=(self.connect_key, config, f"{test_name}_{task_index}"))
+ thread.start()
+ self.thread_list.append(thread)
+
+ def start_file_recv_test(self, config):
+ task_count = config["task_count"]
+ test_name = f"{self.connect_key}_filerecv"
+ for task_index in range(task_count):
+ thread = threading.Thread(target=process_file_recv_test, name=f"{test_name}_{task_index}",
+ args=(self.connect_key, config, f"{test_name}_{task_index}"))
+ thread.start()
+ self.thread_list.append(thread)
+
+ def start_fport_trans_file_test(self, config):
+ task_count = len(config["port_info"])
+ test_name = f"{self.connect_key}_fporttrans"
+ for task_index in range(task_count):
+ thread = threading.Thread(target=process_fport_trans_file_test, name=f"{test_name}_{task_index}",
+ args=(self.connect_key, config, f"{test_name}_{task_index}", task_index))
+ thread.start()
+ self.thread_list.append(thread)
+
+
+class UsbSession(Session):
+ """
+ usb连接对应的session类
+ """
+ session_type = "usb"
+
+
+class TcpSession(Session):
+ """
+ tcp连接对应的session类
+ """
+ session_type = "tcp"
+
+ def start_tcp_connect(self):
+ result, _ = run_cmd_block(f"hdc tconn {self.connect_key}")
+ logger.info(f"start_tcp_connect {self.connect_key} result:{result}")
+ RELEASE_TCONN_CMD_LIST.append(f"hdc tconn {self.connect_key} -remove")
+ return True
+
+
+def start_server():
+ cmd = "hdc start"
+ result, _ = run_cmd_block(cmd)
+ return result
+
+
+def get_dev_list():
+ try:
+ result, _ = run_cmd_block("hdc list targets")
+ result = result.split()
+ except (OSError, IndexError):
+ result = ["failed to detect device"]
+ return False, result
+ targets = result
+ if len(targets) == 0:
+ logger.error(f"get device, devices list is empty")
+ return False, []
+ if "[Empty]" in targets[0]:
+ logger.error(f"get device, no devices found, devices:{targets}")
+ return False, targets
+ return True, targets
+
+
+def check_device_online(connect_key):
+ """
+ 确认设备是否在线
+ """
+ result, devices = get_dev_list()
+ if result is False:
+ logger.error(f"get device failed, devices:{devices}")
+ return False
+ if connect_key in devices:
+ return True
+ return False
+
+
+def init_test_env():
+ """
+ 初始化测试环境
+ 1、使用tmode port和fport转发,构建多session tcp测试场景
+ """
+ logger.info(f"init env")
+ start_server()
+ result, devices = get_dev_list()
+ if result is False:
+ logger.error(f"get device failed, devices:{devices}")
+ return False
+ device_sn = devices[0]
+ if len(devices) > 1:
+ # 存在多个设备连接,获取不是ip:port的设备作为待测试的设备
+ logger.info(f"Multiple devices are connected, devices:{devices}")
+ for dev in devices:
+ if ':' not in dev:
+ device_sn = dev
+ # 关闭设备侧监听端口
+ result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port close")
+ logger.info(f"close tmode port finished")
+ time.sleep(10)
+
+ logger.info(f"get device:{device_sn}")
+
+ # 开启设备侧监听端口
+ result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port {DEVICE_LISTEN_PORT}")
+ logger.info(f"run tmode port {DEVICE_LISTEN_PORT}, result:{result}")
+ time.sleep(10)
+ logger.info(f"start check device:{device_sn}")
+ if check_device_online(device_sn) is False:
+ logger.error(f"device {device_sn} in not online")
+ return False
+ logger.info(f"init_test_env finished")
+ return True
+
+
+def start_usb_session_test(connect_key):
+ """
+ 开始一个usb session的测试
+ """
+ logger.info(f"start_usb_session_test connect_key:{connect_key}")
+ session = UsbSession(connect_key)
+ session.set_test_config(USB_SESSION_TEST_CONFIG)
+ if session.start_test() is False:
+ logger.error(f"session.start_test failed, connect_key:{connect_key}")
+ return False
+ SESSION_LIST.append(session)
+ logger.info(f"start_usb_session_test connect_key:{connect_key} finished")
+ return True
+
+
+def start_local_tcp_session_test(connect_key, port_list):
+ """
+ 1、遍历传入的端口号,创建fport端口转发到设备端的DEVICE_LISTEN_PORT,
+ 2、使用tconn连接后进行测试
+ """
+ logger.info(f"start_local_tcp_session_test port_list:{port_list}")
+ if len(TCP_SESSION_TEST_CONFIG) == 0 or len(port_list) == 0:
+ logger.info(f"port_list:{port_list} TCP_SESSION_TEST_CONFIG:{TCP_SESSION_TEST_CONFIG}")
+ logger.info(f"port_list or TCP_SESSION_TEST_CONFIG is empty, no need do local tcp session test, exit")
+ return True
+ for port in port_list:
+ # 构建fport通道
+ logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT}")
+ result, _ = run_cmd_block(f"hdc -t {connect_key} fport tcp:{port} tcp:{DEVICE_LISTEN_PORT}")
+ logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT} result:{result}")
+ RELEASE_FPORT_CMD_LIST.append(f"hdc -t {connect_key} fport rm tcp:{port} tcp:{DEVICE_LISTEN_PORT}")
+ tcp_key = f"127.0.0.1:{port}"
+ logger.info(f"start TcpSession connect_key:{tcp_key}")
+ session = TcpSession(tcp_key)
+ session.set_test_config(TCP_SESSION_TEST_CONFIG)
+ session.start_tcp_connect()
+ if session.start_test() is False:
+ logger.error(f"session.start_test failed, connect_key:{tcp_key}")
+ return False
+ SESSION_LIST.append(session)
+ logger.info(f"start tcp test connect_key:{tcp_key} finished")
+
+ logger.info(f"start_local_tcp_session_test finished")
+ return True
+
+
+def start_test(msg_queue):
+ """
+ 启动测试
+ 1、启动usb session场景的测试
+ 2、启动通过 tmode+fport模拟的tcp session场景的测试
+ """
+ logger.info(f"start test")
+ result, devices = get_dev_list()
+ if result is False:
+ logger.error(f"get device failed, devices:{devices}")
+ return False
+ device_sn = devices[0]
+ DEVICE_LIST.append(device_sn)
+ if start_usb_session_test(device_sn) is False:
+ logger.error(f"start_usb_session_test failed, devices:{devices}")
+ return False
+ if start_local_tcp_session_test(device_sn, MUTIL_SESSION_TEST_PC_PORT_LIST) is False:
+ logger.error(f"start_tcp_session_test failed, devices:{devices}")
+ return False
+ return True
+
+
+def release_resource():
+ """
+ 释放相关资源
+ 1、断开tconn连接
+ 2、fport转发
+ 3、tmode port
+ """
+ logger.info(f"enter release_resource")
+ for cmd in RELEASE_TCONN_CMD_LIST:
+ result, _ = run_cmd_block(cmd)
+ logger.info(f"release tconn cmd:{cmd} result:{result}")
+ for cmd in RELEASE_FPORT_CMD_LIST:
+ result, _ = run_cmd_block(cmd)
+ logger.info(f"release fport cmd:{cmd} result:{result}")
+ result, _ = run_cmd_block(f"hdc -t {DEVICE_LIST[0]} tmode port close")
+ logger.info(f"tmode port close, device:{DEVICE_LIST[0]} result:{result}")
+
+
+def run_stability_test():
+ global logger
+ logger = logging.getLogger(__name__)
+ logger.info(f"main resource_path:{RESOURCE_PATH}")
+ start_time = get_time_str()
+ if init_test_env() is False:
+ logger.error(f"init_test_env failed")
+ return False
+
+ # 启动消息收集进程
+ msg_thread = threading.Thread(target=process_msg, name="msg_process", args=(TEST_THREAD_MSG_QUEUE, "msg_process"))
+ msg_thread.start()
+
+ put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"start_time": start_time}})
+ logger.info(f"start run test thread")
+ if start_test(TEST_THREAD_MSG_QUEUE) is False:
+ logger.error(f"start_test failed")
+ stop_time = get_time_str()
+ put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}})
+ TEST_THREAD_MSG_QUEUE.put(None)
+ msg_thread.join()
+ return False
+
+ # wait all thread exit
+ logger.info(f"wait all test thread exit")
+ for session in SESSION_LIST:
+ for thread in session.thread_list:
+ thread.join()
+
+ stop_time = get_time_str()
+ put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}})
+
+ release_resource()
+
+ # 传递 None 参数,报告进程收到后退出
+ logger.info(f"main put None to TEST_THREAD_MSG_QUEUE")
+ TEST_THREAD_MSG_QUEUE.put(None)
+ msg_thread.join()
+
+ logger.info(f"exit main, test result:{TEST_RESULT}")
+ return TEST_RESULT
+
+
+if __name__ == "__main__":
+ init_logger()
+ run_stability_test()