diff --git a/MindEnergy/applications/deploy/EXAMPLE.md b/MindEnergy/applications/deploy/EXAMPLE.md new file mode 100644 index 0000000000000000000000000000000000000000..ac73c22238786fa7c422acbc1710c8e3f35078d6 --- /dev/null +++ b/MindEnergy/applications/deploy/EXAMPLE.md @@ -0,0 +1,98 @@ +# MindScience 部署服务 - API 示例 + +本文档提供如何使用 curl 命令调用 MindScience 部署服务 API 的示例。 + +## 1. 加载模型 + +使用 curl 加载模型: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \ + -H "Content-Type: multipart/form-data" \ + -F "model_name=your_model" \ + -F "model_file=@/path/to/your/model_file.zip" +``` + +`model_file.zip` 的目录格式为: + +```bash +model_file.zip +├── your_model_1.mindir +├── your_model_2.mindir +├── ... +└── your_model_n.mindir +``` + +如果只想从本地文件加载模型(不上传),只需提供 `model_name`: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \ + -H "Content-Type: multipart/form-data" \ + -F "model_name=your_local_model" +``` + +## 2. 卸载模型 + +卸载当前已加载的模型: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/unload_model" +``` + +## 3. 推理 + +对数据集执行推理,`task_type` 指定选择 `model_file.zip` 中的哪个模型进行推理,其取值需小于 `model_file.zip` 中的模型数量: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/infer" \ + -H "Content-Type: multipart/form-data" \ + -F "dataset=@/path/to/your/dataset.h5" \ + -F "task_type=0" +``` + +这将返回一个 task_id,可用于检查推理请求的状态。 + +## 4. 任务状态查询 + +检查推理任务的状态(将 {task_id} 替换为从推理API返回的实际任务 ID): + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_status/{task_id}" +``` + +例如: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_status/123e4567-e89b-12d3-a456-426614174000" +``` + +## 5. 结果下载 + +下载已完成的推理任务的结果(将 {task_id} 替换为实际的任务 ID): + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_results/{task_id}" -o "results.h5" +``` + +`-o` 标志将响应保存为指定文件名的文件。例如: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_results/123e4567-e89b-12d3-a456-426614174000" -o "results.h5" +``` + +这将下载名为 `results.h5` 的结果文件。 + +## 6. 健康检查 + +检查部署服务的健康状态: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/health_check" +``` + +## 重要说明 + +- 根据 ServerConfig,默认端口可能是 8001,但应检查 `src/config.py` 文件以确认确切端口。 +- 发出请求之前,请确保部署服务正在运行。 +- 模型推理任务在后台异步处理,因此在尝试下载结果之前需要检查任务状态。 +- 对于并发请求数有限制(在 DeployConfig 中配置)。 diff --git a/MindEnergy/applications/deploy/EXAMPLE_EN.md b/MindEnergy/applications/deploy/EXAMPLE_EN.md new file mode 100644 index 0000000000000000000000000000000000000000..f2a416799700e8f8f9bd7d4393f0536a80f357c1 --- /dev/null +++ b/MindEnergy/applications/deploy/EXAMPLE_EN.md @@ -0,0 +1,98 @@ +# MindScience Deployment Service - API Examples + +This document provides examples of how to use curl commands to call the MindScience deployment service APIs. + +## 1. Load Model + +Use curl to load a model: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \ + -H "Content-Type: multipart/form-data" \ + -F "model_name=your_model" \ + -F "model_file=@/path/to/your/model_file.zip" +``` + +The directory structure of `model_file.zip` should be: + +```bash +model_file.zip +├── your_model_1.mindir +├── your_model_2.mindir +├── ... +└── your_model_n.mindir +``` + +If you only want to load a model from local files (without uploading), just provide `model_name`: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \ + -H "Content-Type: multipart/form-data" \ + -F "model_name=your_local_model" +``` + +## 2. Unload Model + +Unload the currently loaded model: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/unload_model" +``` + +## 3. Inference + +Perform inference on a dataset. `task_type` specifies which model in `model_file.zip` to use for inference, and its value should be less than the number of models in `model_file.zip`: + +```bash +curl -X POST "http://localhost:8001/mindscience/deploy/infer" \ + -H "Content-Type: multipart/form-data" \ + -F "dataset=@/path/to/your/dataset.h5" \ + -F "task_type=0" +``` + +This will return a task_id that can be used to check the status of the inference request. + +## 4. Task Status Query + +Check the status of an inference task (replace {task_id} with the actual task ID returned from the inference API): + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_status/{task_id}" +``` + +For example: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_status/123e4567-e89b-12d3-a456-426614174000" +``` + +## 5. Result Download + +Download the results of a completed inference task (replace {task_id} with the actual task ID): + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_results/{task_id}" -o "results.h5" +``` + +The `-o` flag saves the response to a file with the specified filename. For example: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/query_results/123e4567-e89b-12d3-a456-426614174000" -o "results.h5" +``` + +This will download the result file named `results.h5`. + +## 6. Health Check + +Check the health status of the deployment service: + +```bash +curl -X GET "http://localhost:8001/mindscience/deploy/health_check" +``` + +## Important Notes + +- According to ServerConfig, the default port might be 8001, but you should check the `src/config.py` file to confirm the exact port. +- Ensure the deployment service is running before making requests. +- Model inference tasks are processed asynchronously in the background, so check the task status before attempting to download results. +- There are limits on the number of concurrent requests (configured in DeployConfig). diff --git a/MindEnergy/applications/deploy/README.md b/MindEnergy/applications/deploy/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b3643879d6d3ca811453582ef379b3e96ef8c938 --- /dev/null +++ b/MindEnergy/applications/deploy/README.md @@ -0,0 +1,141 @@ +# MindScience 部署服务 + +MindScience 部署服务是一个基于 FastAPI 的模型部署和监控系统,支持多设备并行推理和资源监控功能。 + +## 架构图 + +

+ +

+ +## 目录结构 + +```shell +deploy/ +├── deploy.py # 模型部署服务主文件 +├── monitor.py # 服务器监控服务主文件 +├── requirements.txt # 项目依赖 +└── src/ # 源代码目录 + ├── config.py # 配置文件 + ├── enums.py # 枚举定义 + ├── inference.py # 推理实现 + ├── schemas.py # 数据模型定义 + ├── session.py # 会话管理 + └── utils.py # 工具函数 +``` + +## 功能特性 + +### 部署服务 (deploy.py) + +- **模型加载/卸载**:支持通过 HTTP 接口上传 MindIR 模型文件并加载到设备上 +- **异步推理**:支持后台异步执行推理任务 +- **任务状态管理**:支持任务状态查询(待处理、处理中、已完成、错误) +- **健康检查**:提供模型状态检查接口 +- **结果下载**:推理完成后可下载结果文件 +- **多设备支持**:支持最多 8 个 NPU 设备并行推理 + +### 监控服务 (monitor.py) + +- **资源监控**:实时监控 CPU、内存和 NPU 使用率 +- **健康检查**:提供服务器资源使用情况查询接口 + +## 配置参数 + +### 部署配置 (DeployConfig) + +- `max_device_num`: 最大设备数量(默认 8) +- `deploy_device_num`: 部署使用的设备数量(默认 8) +- `max_request_num`: 最大并发请求数(默认 100) +- `models_dir`: 模型文件存储目录(默认 "models") +- `datasets_dir`: 数据集文件存储目录(默认 "datasets") +- `results_dir`: 结果文件存储目录(默认 "results") +- `dummy_model_path`: 用于测试的虚拟模型路径(默认 "dummy_model.mindir") +- `chunk_size`: 数据块大小(默认 8MB) + +### 服务器配置 (ServerConfig) + +- `host`: 服务器主机地址(默认 "127.0.0.1") +- `deploy_port`: 部署服务端口(默认 8001) +- `monitor_port`: 监控服务端口(默认 8002) +- `limit_concurrency`: 最大并发连接数(默认 1000) +- `timeout_keep_alive`: Keep-alive 连接超时时间(默认 30 秒) +- `backlog`: 待处理连接队列大小(默认 2048) + +## API 接口 + +### 部署服务接口 + +- `POST /mindscience/deploy/load_model` - 加载模型 + - 参数:model_name (表单), model_file (可选文件) + +- `POST /mindscience/deploy/unload_model` - 卸载模型 + +- `POST /mindscience/deploy/infer` - 执行推理 + - 参数:dataset (文件), task_type (表单) + +- `GET /mindscience/deploy/query_status/{task_id}` - 查询任务状态 + +- `GET /mindscience/deploy/query_results/{task_id}` - 获取推理结果 + +- `GET /mindscience/deploy/health_check` - 健康检查 + +### 监控服务接口 + +- `GET /mindscience/monitor/resource_usage` - 获取资源使用情况 + - 返回:CPU使用率、内存使用率、NPU使用率、NPU内存使用率 + +## 依赖项 + +- `fastapi == 0.121.2`: Web 框架 +- `uvicorn == 0.38.0`: ASGI 服务器 +- `python-multipart == 0.0.20`: 多部分表单数据处理 +- `h5py == 3.14.0`: HDF5 文件处理 +- `loguru == 0.7.3`: 日志处理 +- `aiofiles == 25.1.0`: 异步文件操作 +- `psutil == 7.0.0`: 系统和进程监控 +- `numpy == 1.26.4`: 科学计算库 +- `mindspore_lite == 2.7.1`: MindSpore Lite 推理引擎 +- `CANN == 8.2.RC1`: 神经网络异构计算架构 + +## 安装和使用 + +1. 参考[CANN官网](https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/83RC1/softwareinst/instg/instg_quick.html?Mode=PmIns&InstallType=local&OS=openEuler&Software=cannToolKit)文档安装CANN社区版软件包。 + +2. 从[MindSpore官网](https://www.mindspore.cn/lite/docs/zh-CN/r2.7.1/use/downloads.html#mindspore-lite-python%E6%8E%A5%E5%8F%A3%E5%BC%80%E5%8F%91%E5%BA%93)下载MindSpore Lite Python接口开发库: + + ```bash + wget https://ms-release.obs.cn-north-4.myhuaweicloud.com/2.7.1/MindSporeLite/lite/release/linux/aarch64/cloud_fusion/python310/mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl + + pip install mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl + ``` + +3. 安装 Python 依赖: + + ```bash + pip install -r requirements.txt + ``` + +4. 根据实际业务修改src/config.py配置文件中的配置项。 + +5. 启动部署服务: + + ```bash + python deploy.py + ``` + +6. 启动监控服务: + + ```bash + python monitor.py + ``` + +## 技术架构 + +- **框架**:FastAPI + Uvicorn +- **推理引擎**:MindSpore Lite +- **设备支持**:华为昇腾 NPU +- **并行处理**:多进程并行推理 +- **数据格式**:HDF5 数据存储 + +系统采用异步处理方式,支持高并发推理请求,并提供完整的任务生命周期管理。 diff --git a/MindEnergy/applications/deploy/README_EN.md b/MindEnergy/applications/deploy/README_EN.md new file mode 100644 index 0000000000000000000000000000000000000000..00260073c4273d690bc7693ac920836430a8f339 --- /dev/null +++ b/MindEnergy/applications/deploy/README_EN.md @@ -0,0 +1,141 @@ +# MindScience Deployment Service + +MindScience Deployment Service is a model deployment and monitoring system based on FastAPI, supporting multi-device parallel inference and resource monitoring capabilities. + +## Architecture + +

+ +

+ +## Directory Structure + +```shell +deploy/ +├── deploy.py # Model deployment service main file +├── monitor.py # Server monitoring service main file +├── requirements.txt # Project dependencies +└── src/ # Source code directory + ├── config.py # Configuration file + ├── enums.py # Enum definitions + ├── inference.py # Inference implementation + ├── schemas.py # Data model definitions + ├── session.py # Session management + └── utils.py # Utility functions +``` + +## Features + +### Deployment Service (deploy.py) + +- **Model load/unload**: Supports uploading MindIR model files via HTTP interface and loading them to devices +- **Asynchronous inference**: Supports background asynchronous execution of inference tasks +- **Task status management**: Supports task status queries (pending, processing, completed, error) +- **Health check**: Provides model status checking interface +- **Result download**: Results file can be downloaded after inference completion +- **Multi-device support**: Supports up to 8 NPU devices for parallel inference + +### Monitoring Service (monitor.py) + +- **Resource monitoring**: Real-time monitoring of CPU, memory and NPU usage +- **Health check**: Provides server resource usage query interface + +## Configuration Parameters + +### Deployment Configuration (DeployConfig) + +- `max_device_num`: Maximum number of devices (default 8) +- `deploy_device_num`: Number of devices used for deployment (default 8) +- `max_request_num`: Maximum concurrent request number (default 100) +- `models_dir`: Model file storage directory (default "models") +- `datasets_dir`: Dataset file storage directory (default "datasets") +- `results_dir`: Result file storage directory (default "results") +- `dummy_model_path`: Path for dummy model used in testing (default "dummy_model.mindir") +- `chunk_size`: Chunk size (default 8MB) + +### Server Configuration (ServerConfig) + +- `host`: Server host address (default "127.0.0.1") +- `deploy_port`: Deployment service port (default 8001) +- `monitor_port`: Monitoring service port (default 8002) +- `limit_concurrency`: Maximum concurrent connection number (default 1000) +- `timeout_keep_alive`: Keep-alive connection timeout (default 30 seconds) +- `backlog`: Pending connection queue size (default 2048) + +## API Interfaces + +### Deployment Service Interfaces + +- `POST /mindscience/deploy/load_model` - Load model + - Parameters: model_name (form), model_file (optional file) + +- `POST /mindscience/deploy/unload_model` - Unload model + +- `POST /mindscience/deploy/infer` - Execute inference + - Parameters: dataset (file), task_type (form) + +- `GET /mindscience/deploy/query_status/{task_id}` - Query task status + +- `GET /mindscience/deploy/query_results/{task_id}` - Get inference results + +- `GET /mindscience/deploy/health_check` - Health check + +### Monitoring Service Interface + +- `GET /mindscience/monitor/resource_usage` - Get resource usage + - Returns: CPU usage rate, memory usage rate, NPU usage rate, NPU memory usage rate + +## Dependencies + +- `fastapi == 0.121.2`: Web framework +- `uvicorn == 0.38.0`: ASGI server +- `python-multipart == 0.0.20`: Multipart form data processing +- `h5py == 3.14.0`: HDF5 file processing +- `loguru == 0.7.3`: Logging +- `aiofiles == 25.1.0`: Asynchronous file operations +- `psutil == 7.0.0`: System and process monitoring +- `numpy == 1.26.4`: Scientific computing library +- `mindspore_lite == 2.7.1`: MindSpore Lite inference engine +- `CANN == 8.2.RC1`: Compute Architecture for Neural Networks + +## Installation and Usage + +1. Refer to the [CANN official website](https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/83RC1/softwareinst/instg/instg_quick.html?Mode=PmIns&InstallType=local&OS=openEuler&Software=cannToolKit) documentation to install the CANN community edition software package. + +2. Download MindSpore Lite Python API development library from the [MindSpore official website](https://www.mindspore.cn/lite/docs/zh-CN/r2.7.1/use/downloads.html#mindspore-lite-python%E6%8E%A5%E5%8F%A3%E5%BC%80%E5%8F%91%E5%BA%93) + + ```bash + wget https://ms-release.obs.cn-north-4.myhuaweicloud.com/2.7.1/MindSporeLite/lite/release/linux/aarch64/cloud_fusion/python310/mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl + + pip install mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl + ``` + +3. Install Python dependencies: + + ```bash + pip install -r requirements.txt + ``` + +4. Modify the configuration items in the src/config.py file based on actual business needs. + +5. Start deployment service: + + ```bash + python deploy.py + ``` + +6. Start monitoring service: + + ```bash + python monitor.py + ``` + +## Technical Architecture + +- **Framework**: FastAPI + Uvicorn +- **Inference engine**: MindSpore Lite +- **Device support**: Huawei Ascend NPU +- **Parallel processing**: Multi-process parallel inference +- **Data format**: HDF5 data storage + +The system adopts an asynchronous processing approach, supports high-concurrency inference requests, and provides complete task lifecycle management. diff --git a/MindEnergy/applications/deploy/deploy.py b/MindEnergy/applications/deploy/deploy.py new file mode 100644 index 0000000000000000000000000000000000000000..abebc09c3c7ea384d8c1fb45363b870806828e31 --- /dev/null +++ b/MindEnergy/applications/deploy/deploy.py @@ -0,0 +1,374 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +MindScience Deployment Service. + +This module implements a model deployment service based on FastAPI that supports: +- Model loading/unloading via HTTP interface +- Asynchronous inference execution with background tasks +- Task status management (pending, processing, completed, error) +- Health checking functionality +- Multi-device parallel inference support (up to 8 NPU devices) +- Result file download after inference completion + +The service provides RESTful APIs for model management and inference execution, +with proper error handling and resource management. +""" + +import os +import uuid +import signal +from typing import Any, Union +from contextlib import asynccontextmanager + +from loguru import logger +from uvicorn import Server +from uvicorn.config import Config +from fastapi import FastAPI, Form, File, UploadFile, BackgroundTasks, HTTPException +from fastapi.responses import JSONResponse, FileResponse +from fastapi.middleware.cors import CORSMiddleware + +from src.enums import HealthStatus, ModelStatus, TaskStatus +from src.schemas import ModelInfo +from src.session import SessionManager +from src.utils import Utilities +from src.config import configure_logging, DeployConfig, ServerConfig + +# pylint: disable=unused-argument, redefined-outer-name + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manages the application lifespan by initializing and cleaning up resources. + + This function is used as an async context manager for the FastAPI application + lifespan. It handles initialization of models on the specified devices during + startup and performs cleanup during shutdown. + + Args: + app (FastAPI): The FastAPI application instance. + """ + configure_logging("deploy") + logger.info(f"Initializing models on devices {list(range(DeployConfig.deploy_device_num))}.") + yield + logger.info("Shutting down deploy service.") + +app = FastAPI(lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"] +) + +model = SessionManager() +tasks_status = {} + + +def inference(dataset_path, task_id, task_type): + """Performs inference on a dataset. + + This function loads an HDF5 dataset, performs batch inference using the model, + saves the results to an HDF5 file, and updates the task status. + + Args: + dataset_path (str): Path to the HDF5 dataset file. + task_id (str): Unique identifier for the inference task. + task_type (int): Type of inference task to perform. + + Raises: + Exception: If there is an error during the inference process. + """ + try: + tasks_status[task_id] = TaskStatus.PROCESSING + + batch_inputs = Utilities.load_h5_file(dataset_path) + + results_list = model.batch_infer(batch_inputs, task_type) + + os.makedirs(DeployConfig.results_dir, exist_ok=True) + results_path = os.path.join(DeployConfig.results_dir, f"{task_id}_results.h5") + + Utilities.save_h5_file(results_list, results_path) + + tasks_status[task_id] = TaskStatus.COMPLETED + except Exception as e: + tasks_status[task_id] = TaskStatus.ERROR + logger.error(f"Task {task_id} infer failed, ERROR: {e}.") + + +@app.post("/mindscience/deploy/load_model") +async def load_model(model_name: str = Form(), model_file: Union[UploadFile, str] = None): + """Loads a model for inference. + + This endpoint handles the loading of a model, either from an uploaded file + or from a local file. If a model is already loaded, it returns an error. + + Args: + model_name (str): Name of the model to load. + model_file (Union[UploadFile, str], optional): Uploaded model file. Defaults to None. + + Returns: + JSONResponse: A response containing model information and status. + """ + model_info = ModelInfo() + if model.is_ready() == HealthStatus.READY: + model_info.status = ModelStatus.FAILURE + model_info.message = f"Model {model.session_name} is already loaded, please unload first." + return JSONResponse( + content=model_info.model_dump(), + status_code=403, + headers={"Content-Type": "application/json"} + ) + + if not model_file: + logger.info("File not uploaded, model loaded from local file.") + else: + try: + logger.info(f"Start receive {model_name} file") + os.makedirs(DeployConfig.models_dir, exist_ok=True) + save_dir = os.path.join(DeployConfig.models_dir, model_name) + os.makedirs(save_dir, exist_ok=True) + + save_path = os.path.join(save_dir, model_file.filename) + await Utilities.save_upload_file(model_file, save_path) + + Utilities.extract_file(save_path) + logger.info(f"{model_name} file receive successfully.") + except Exception as e: + model_info.status = ModelStatus.FAILURE + model_info.message = str(e) + return JSONResponse( + content=model_info.model_dump(), + status_code=500, + headers={"Content-Type": "application/json"} + ) + + try: + model.init_session(model_name, device_num=DeployConfig.deploy_device_num) + return JSONResponse( + content=model_info.model_dump(), + status_code=201, + headers={"Content-Type": "application/json"} + ) + except Exception as e: + model_info.status = ModelStatus.FAILURE + model_info.message = str(e) + return JSONResponse( + content=model_info.model_dump(), + status_code=500, + headers={"Content-Type": "application/json"} + ) + + +@app.post("/mindscience/deploy/unload_model") +async def unload_model(): + """Unloads the currently loaded model. + + This endpoint unloads the model session and frees up the associated resources. + + Returns: + JSONResponse: A response containing model information and status. + """ + model_info = ModelInfo() + try: + model.del_session() + return JSONResponse( + content=model_info.model_dump(), + status_code=200, + headers={"Content-Type": "application/json"} + ) + except Exception as e: + model_info.status = ModelStatus.FAILURE + model_info.message = str(e) + return JSONResponse( + content=model_info.model_dump(), + status_code=500, + headers={"Content-Type": "application/json"} + ) + + +@app.post("/mindscience/deploy/infer") +async def infer(dataset: UploadFile = File(..., description="input dataset"), task_type: int = Form(), + background_tasks: BackgroundTasks = Any): + """Performs inference on the uploaded dataset. + + This endpoint accepts a dataset file and performs inference in the background. + It checks if the model is ready, validates the number of pending tasks, + saves the uploaded dataset, and starts the inference task. + + Args: + dataset (UploadFile): The input dataset file to process. + task_type (int): Type of inference task to perform. + background_tasks (BackgroundTasks): FastAPI background tasks manager. + + Returns: + JSONResponse: A response containing the task ID. + + Raises: + HTTPException: If the server is not ready or if the request limit is exceeded. + """ + if model.is_ready() != HealthStatus.READY: + raise HTTPException(status_code=500, detail="Server is not ready, please check") + + pending_number = Utilities.count_pending_task(tasks_status) + logger.info(f"Pending task number is {pending_number}") + if pending_number >= DeployConfig.max_request_num: + logger.error(f"Predict request number exceed limited number: \ + {pending_number} vs {DeployConfig.max_request_num}") + raise HTTPException(status_code=503, detail="Request number exceed limited number, please wait for a while.") + + task_id = str(uuid.uuid4()) + tasks_status[task_id] = TaskStatus.PENDING + + try: + os.makedirs(DeployConfig.datasets_dir, exist_ok=True) + dataset_path = os.path.join(DeployConfig.datasets_dir, f"{task_id}_{dataset.filename}") + await Utilities.save_upload_file(dataset, dataset_path) + + background_tasks.add_task(inference, dataset_path, task_id, task_type) + + return JSONResponse( + content={"task_id": task_id}, + status_code=200, + headers={"Content-Type": "application/json"} + ) + except Exception as e: + tasks_status[task_id] = TaskStatus.ERROR + raise HTTPException(status_code=500, detail=f"Task {task_id} infer failed, ERROR: {e}.") from e + + +@app.get("/mindscience/deploy/query_status/{task_id}") +async def query_status(task_id: str): + """Queries the status of an inference task. + + This endpoint retrieves the current status of a specific inference task + by its task ID. + + Args: + task_id (str): Unique identifier of the task to query. + + Returns: + JSONResponse: A response containing the task status. + + Raises: + HTTPException: If the task ID is not found. + """ + status = tasks_status.get(task_id) + if status is None: + raise HTTPException(status_code=404, detail=f"Task {task_id} is not found, please check!") + return JSONResponse( + content={"status": status}, + status_code=200, + headers={"Content-Type": "application/json"} + ) + + +@app.get("/mindscience/deploy/query_results/{task_id}") +async def query_results(task_id: str): + """Retrieves the results of a completed inference task. + + This endpoint returns the results file of a completed inference task + if the task is completed, otherwise returns the current status. + + Args: + task_id (str): Unique identifier of the task to query. + + Returns: + FileResponse or JSONResponse: Either the results file or a response containing the task status and message. + + Raises: + HTTPException: If the task ID is not found. + """ + status = tasks_status.get(task_id) + if status is None: + raise HTTPException(status_code=404, detail=f"Task {task_id} is not found, please check!") + if status != TaskStatus.COMPLETED: + return JSONResponse( + content={"status": status, "message": f"Task {task_id} is not completed."}, + status_code=404, + headers={"Content-Type": "application/json"} + ) + result_path = os.path.join(DeployConfig.results_dir, f"{task_id}_results.h5") + return FileResponse(result_path, filename=f"{task_id}_results.h5") + + +@app.get("/mindscience/deploy/health_check") +async def health_check(): + """Performs a health check on the deployment service. + + This endpoint checks the readiness status of the model and returns + an appropriate HTTP response based on the model's health status. + + Returns: + JSONResponse: A response indicating the health status of the service. + """ + health_status = model.is_ready() + logger.info(f"health check result is {health_status}.") + if health_status == HealthStatus.NOTLOADED: + return JSONResponse( + content={}, + status_code=501, + headers={"Content-Type": "application/json"} + ) + if health_status == HealthStatus.EXCEPTION: + return JSONResponse( + content={model.session_name: health_status}, + status_code=503, + headers={"Content-Type": "application/json"} + ) + return JSONResponse( + content={model.session_name: health_status}, + status_code=200, + headers={"Content-Type": "application/json"} + ) + + +if __name__ == "__main__": + server = Server( + Config( + app=app, + host=ServerConfig.host, + port=ServerConfig.deploy_port, + limit_concurrency=ServerConfig.limit_concurrency, + timeout_keep_alive=ServerConfig.timeout_keep_alive, + backlog=ServerConfig.backlog + ) + ) + + def terminate_signal_handler(signum, frame): + """Handles termination signals to gracefully shut down the server. + + This function is called when the server receives a termination signal + (SIGTERM or SIGINT). It cleans up the model session and sets the server + to exit. + + Args: + signum (int): The signal number received. + frame: The current stack frame (unused). + """ + global model + logger.info(f"Catch signal: {signum}, starting terminate server...") + try: + model.del_session() + except Exception as e: + logger.exception(f"Model clear failed, please check! ERROR: {e}") + del model + server.should_exit = True + + signal.signal(signal.SIGTERM, terminate_signal_handler) + signal.signal(signal.SIGINT, terminate_signal_handler) + + logger.info("Starting deploy server...") + server.run() diff --git a/MindEnergy/applications/deploy/docs/deploy_arch.png b/MindEnergy/applications/deploy/docs/deploy_arch.png new file mode 100755 index 0000000000000000000000000000000000000000..9394afa58298f0b91184b26a8a84fc7cc37e6fa8 Binary files /dev/null and b/MindEnergy/applications/deploy/docs/deploy_arch.png differ diff --git a/MindEnergy/applications/deploy/docs/deploy_arch_en.png b/MindEnergy/applications/deploy/docs/deploy_arch_en.png new file mode 100755 index 0000000000000000000000000000000000000000..e8a040868ba88c9fe67b292c372851a5e3b6e30f Binary files /dev/null and b/MindEnergy/applications/deploy/docs/deploy_arch_en.png differ diff --git a/MindEnergy/applications/deploy/monitor.py b/MindEnergy/applications/deploy/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..8939fe3049dcd7afeb1769bf9948c3a62f5d6127 --- /dev/null +++ b/MindEnergy/applications/deploy/monitor.py @@ -0,0 +1,157 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +MindScience Monitoring Service. + +This module implements a server resource monitoring service based on FastAPI that supports: +- Real-time monitoring of CPU usage +- Real-time monitoring of memory usage +- Real-time monitoring of NPU (Neural Processing Unit) usage +- Real-time monitoring of NPU memory usage +- Health check endpoint for server resource usage + +The service provides a RESTful API endpoint to retrieve current system resource statistics, +which can be used for system health monitoring and performance analysis. +""" + +import signal +import asyncio +from contextlib import asynccontextmanager + +import psutil +from loguru import logger +from fastapi import FastAPI +from uvicorn import Server +from uvicorn.config import Config + +from src.utils import Utilities +from src.config import configure_logging, ServerConfig + +# pylint: disable=unused-argument, redefined-outer-name + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manages the application lifespan by initializing and cleaning up resources. + + This function is used as an async context manager for the FastAPI application + lifespan. It handles logging configuration and performs startup and shutdown + logging. + + Args: + app (FastAPI): The FastAPI application instance. + """ + configure_logging("monitor") + logger.info("Starting monitor service.") + yield + logger.info("Shutting down monitor service.") + +app = FastAPI(lifespan=lifespan) + +@app.get("/mindscience/monitor/resource_usage") +async def get_server_status(): + """Retrieves the current server resource usage statistics. + + This endpoint collects and returns CPU, memory, and NPU usage statistics + from the server. It uses psutil for CPU and memory metrics and a custom + utility function for NPU metrics. + + Returns: + dict: A dictionary containing the status, message, and resource usage data + including CPU usage rate, memory usage rate, NPU usage rate, and NPU memory usage rate. + """ + try: + results = await asyncio.gather( + asyncio.to_thread(psutil.cpu_percent, interval=None), + asyncio.to_thread(lambda: psutil.virtual_memory().percent), + Utilities.get_npu_usage(), + return_exceptions=True + ) + + # 解包结果 + cpu_usage, memory_usage, npu_stats = results + + if isinstance(cpu_usage, Exception): + logger.error(f"Failed to get CPU usage: {cpu_usage}") + cpu_usage = 0.0 + + if isinstance(memory_usage, Exception): + logger.error(f"Failed to get memory usage: {memory_usage}") + memory_usage = 0.0 + + if isinstance(npu_stats, Exception): + logger.warning(f"Failed to get NPU usage info: {npu_stats}") + npu_usage_rate = "0.00" + npu_memory_usage_rate = "0.00" + elif npu_stats: + total_memory = sum(stat["memory_total_mb"] for stat in npu_stats) + used_memory = sum(stat["memory_used_mb"] for stat in npu_stats) + avg_utilization = sum(stat["utilization_percent"] for stat in npu_stats) / len(npu_stats) + avg_memory_usage = (used_memory / total_memory * 100) if total_memory > 0 else 0 + npu_usage_rate = f"{avg_utilization:.2f}" + npu_memory_usage_rate = f"{avg_memory_usage:.2f}" + else: + npu_usage_rate = "0.00" + npu_memory_usage_rate = "0.00" + + return { + "status": 200, + "msg": "success", + "data": { + "cpuUsageRate": f"{cpu_usage:.2f}", + "memoryUsageRate": f"{memory_usage:.2f}", + "npuUsageRate": npu_usage_rate, + "npuMemoryUsageRate": npu_memory_usage_rate + } + } + except Exception as e: + logger.error(f"An unexpected error occurred in get_server_status: {e}") + return { + "status": 500, + "msg": "failure", + "data": {} + } + +if __name__ == "__main__": + server = Server( + Config( + app=app, + host=ServerConfig.host, + port=ServerConfig.monitor_port, + limit_concurrency=ServerConfig.limit_concurrency, + timeout_keep_alive=ServerConfig.timeout_keep_alive, + backlog=ServerConfig.backlog + ) + ) + + def terminate_signal_handler(signum, frame): + """Handles termination signals to gracefully shut down the server. + + This function is called when the server receives a termination signal + (SIGTERM or SIGINT). It sets the server to exit. + + Args: + signum (int): The signal number received. + frame: The current stack frame (unused). + """ + logger.info(f"Catch signal: {signum}, starting terminate server...") + server.should_exit = True + + signal.signal(signal.SIGTERM, terminate_signal_handler) + signal.signal(signal.SIGINT, terminate_signal_handler) + + + logger.info("Starting monitor server...") + server.run() diff --git a/MindEnergy/applications/deploy/requirements.txt b/MindEnergy/applications/deploy/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..efb672f8223e690b0ae022a96605b86b87789078 --- /dev/null +++ b/MindEnergy/applications/deploy/requirements.txt @@ -0,0 +1,8 @@ +fastapi +uvicorn +python-multipart +h5py +loguru +aiofiles +psutil +numpy diff --git a/MindEnergy/applications/deploy/src/__init__.py b/MindEnergy/applications/deploy/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c1d6e2e61bc2e8689b21fa5f96feb9371325f852 --- /dev/null +++ b/MindEnergy/applications/deploy/src/__init__.py @@ -0,0 +1,31 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +MindScience deployment service source package. + +This package contains modules for deploying and monitoring machine learning models +using FastAPI services. The modules include: + +- config: Configuration classes and functions for deployment settings +- enums: Enumerations for status values and message types +- inference: Classes for performing inference using MindSpore Lite +- schemas: Data models for request/response validation +- session: Session management for model inference +- utils: Utility functions for file handling, model collection, and system monitoring + +This package provides the core functionality for model deployment, inference execution, +and system monitoring in the MindScience platform. +""" diff --git a/MindEnergy/applications/deploy/src/config.py b/MindEnergy/applications/deploy/src/config.py new file mode 100644 index 0000000000000000000000000000000000000000..5704c76044fb13952831bb4645e8eaff8eadcc32 --- /dev/null +++ b/MindEnergy/applications/deploy/src/config.py @@ -0,0 +1,119 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Configuration module for MindScience deployment service. + +This module defines configuration classes and functions used throughout the +MindScience deployment and monitoring services. It includes: + +- ModelConfig: Specifies model input and output column names +- DeployConfig: Contains deployment-related settings such as device numbers, request limits, and file paths +- ServerConfig: Defines server settings including host, ports, and connection parameters +- configure_logging: Function to set up logging for different services + +The configurations use dataclasses with frozen=True to ensure immutability +and thread-safe access to configuration values. +""" + +import sys +from typing import Tuple +from dataclasses import dataclass + +from loguru import logger + +@dataclass(frozen=True) +class ModelConfig: + """Configuration for model input and output specifications. + + Attributes: + input_columns: Tuple of column names used as model inputs. + output_columns: Tuple of column names produced as model outputs. + """ + input_columns: Tuple[str] = ("x", "edge_index", "edge_attr") + + output_columns: Tuple[str] = ("output",) + + +@dataclass(frozen=True) +class DeployConfig: + """Configuration for deployment settings. + + Attributes: + max_device_num: Maximum number of devices allowed for deployment. + deploy_device_num: Number of devices to be used for deployment. + max_request_num: Maximum number of concurrent requests allowed. + models_dir: Directory path for storing model files. + datasets_dir: Directory path for storing dataset files. + results_dir: Directory path for storing result files. + dummy_model_path: File path for the dummy model used in testing. + chunk_size: Size of data chunks for processing in bytes. + """ + max_device_num: int = 8 + + deploy_device_num: int = 8 + + max_request_num: int = 100 + + models_dir: str = "models" + + datasets_dir: str = "datasets" + + results_dir: str = "results" + + dummy_model_path: str = "dummy_model.mindir" + + chunk_size: int = 8 * 1024 * 1024 + + +@dataclass(frozen=True) +class ServerConfig: + """Configuration for server settings. + + Attributes: + host: Host address for the server. + deploy_port: Port number for deployment service. + monitor_port: Port number for monitoring service. + limit_concurrency: Maximum number of concurrent connections allowed. + timeout_keep_alive: Timeout duration for keep-alive connections in seconds. + backlog: Maximum number of pending connections in the queue. + """ + host: str = "127.0.0.1" + + deploy_port: int = 8001 + + monitor_port: int = 8002 + + limit_concurrency: int = 1000 + + timeout_keep_alive: int = 30 + + backlog: int = 2048 + + +def configure_logging(service: str): + """Configures logging settings for the specified service. + + Args: + service: Name of the service to configure logging for. + """ + logger.add( + f"logs/{service}.log", + rotation="100 MB", + retention="10 days", + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", + enqueue=True + ) + logger.add(sys.stderr, level="DEBUG") diff --git a/MindEnergy/applications/deploy/src/enums.py b/MindEnergy/applications/deploy/src/enums.py new file mode 100644 index 0000000000000000000000000000000000000000..687760479f8c54d92553e1f95835af1d6cd53c4f --- /dev/null +++ b/MindEnergy/applications/deploy/src/enums.py @@ -0,0 +1,61 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Enumeration module for MindScience deployment service. + +This module defines various enumerations used throughout the +MindScience deployment system, including: + +- ProcessMessage: Message types for inter-process communication +- HealthStatus: Health status indicators for services +- ModelStatus: Status indicators for model loading and execution +- TaskStatus: Execution status indicators for inference tasks + +These enums provide type-safe constants for status values and message types +used in the system's processes and communication. +""" + +from enum import Enum, IntEnum + +class ProcessMessage(IntEnum): + """Enumeration for process message types in the system.""" + ERROR = 0 + PREDICT = 1 + REPLY = 2 + CHECK = 3 + BUILD_FINISH = 4 + EXIT = 5 + + +class HealthStatus(str, Enum): + """Enumeration for health status of services.""" + READY = "ready" + NOTLOADED = "not_loaded" + EXCEPTION = "exception" + + +class ModelStatus(str, Enum): + """Enumeration for model loading or execution status.""" + SUCCESS = "success" + FAILURE = "failure" + + +class TaskStatus(str, Enum): + """Enumeration for task execution status.""" + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + ERROR = "error" diff --git a/MindEnergy/applications/deploy/src/inference.py b/MindEnergy/applications/deploy/src/inference.py new file mode 100644 index 0000000000000000000000000000000000000000..3dd5b12a095315f03350a9dceb17c1ccb20b246f --- /dev/null +++ b/MindEnergy/applications/deploy/src/inference.py @@ -0,0 +1,419 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Inference module for MindScience deployment service. + +This module provides classes and functions for performing inference using +MindSpore Lite across multiple devices (e.g. NPU devices) in parallel. +It includes: + +- InferenceModel: A wrapper for MindSpore Lite models with input/output handling +- MultiprocessInference: A class for managing parallel inference across multiple devices +- infer_process_func: A function that runs in separate processes to handle inference tasks + +The module implements a multiprocessing approach where each device runs in its own +process with communication handled via pipes. It provides methods for model building, +inference execution, health checks, and resource cleanup. +""" + +import os +from typing import List +from multiprocessing import Process, Pipe + +import numpy as np +from loguru import logger + +import mindspore_lite as mslite + +from .config import DeployConfig +from .enums import ProcessMessage + +def infer_process_func(pipe_child_end, model_path, device_id): + """Function that runs in a separate process to handle inference tasks. + + This function initializes an InferenceModel, handles communication through + a pipe, and processes different types of messages including initialization, + inference requests, health checks, and exit signals. It runs in a continuous + loop until it receives an exit message. + + Args: + pipe_child_end: The child end of a multiprocessing.Pipe used for + communication with the parent process. + model_path (str): Path to the model file to load for inference. + device_id (int): ID of the device to run inference on. + """ + try: + infer_model = InferenceModel(model_path, device_id) + if DeployConfig.dummy_model_path.endswith(".mindir") and os.path.exists(DeployConfig.dummy_model_path): + dummy_model = InferenceModel(DeployConfig.dummy_model_path, device_id) + else: + dummy_model = infer_model + pipe_child_end.send((ProcessMessage.BUILD_FINISH, infer_model.batch_size)) + except Exception as e: + logger.exception(f"Model initialization failed on device {device_id}: {e}") + pipe_child_end.send((ProcessMessage.ERROR, str(e))) + pipe_child_end.close() + return + + try: + while True: + msg_type, msg = pipe_child_end.recv() + if msg_type == ProcessMessage.EXIT: + logger.info(f"Device {device_id} received exit signal") + break + if msg_type == ProcessMessage.CHECK: + try: + _ = dummy_model.dummy_infer() + pipe_child_end.send((ProcessMessage.REPLY, "")) + except Exception as e: + err_msg = f"Dummy inference failed on device {device_id}: {e}" + logger.exception(err_msg) + pipe_child_end.send((ProcessMessage.ERROR, err_msg)) + raise RuntimeError(err_msg) from e + elif msg_type == ProcessMessage.PREDICT: + try: + inputs = msg + result = infer_model.infer(inputs) + pipe_child_end.send((ProcessMessage.REPLY, result)) + except Exception as e: + err_msg = f"Inference failed on device {device_id}: {e}" + logger.exception(err_msg) + pipe_child_end.send((ProcessMessage.ERROR, err_msg)) + raise RuntimeError(err_msg) from e + else: + err_msg = f"Unexpected message type {msg_type} on device {device_id}" + logger.exception(err_msg) + pipe_child_end.send((ProcessMessage.ERROR, err_msg)) + raise RuntimeError(err_msg) + except Exception as e: + logger.exception(f"Runtime error on device {device_id}: {e}") + pipe_child_end.send((ProcessMessage.ERROR, str(e))) + finally: + pipe_child_end.close() + + +class MultiprocessInference: + """A class for performing inference using multiple processes across different devices. + + This class manages multiple inference processes that run in parallel across specified + devices (e.g. Ascend NPU devices). It provides methods to build the model, run + inference, perform health checks, and clean up resources. + + Attributes: + model_path (str): Path to the model file to be used for inference. + device_num (int): Number of devices to use for parallel inference. + batch_size (int): Batch size of the loaded model, initialized to -1. + process_pool (list): List of Process objects for the inference processes. + parent_pipes (list): List of parent ends of Pipe objects for communication + with the inference processes. + """ + + def __init__(self, model_path: str, device_num: int): + """Initializes the MultiprocessInference instance. + + Args: + model_path (str): Path to the model file to be used for inference. + device_num (int): Number of devices to use for parallel inference. + """ + self.model_path = model_path + self.device_num = device_num + self.batch_size = -1 + + self.process_pool = [] + self.parent_pipes = [] + + def build_model(self): + """Builds and initializes the model on multiple devices. + + This method creates a process for each device, initializes the model in each process, + and establishes communication pipes between the main process and the worker processes. + It also verifies successful initialization of each device and ensures batch size + consistency across devices. + + Raises: + RuntimeError: If model has already been loaded or if initialization fails. + """ + if self.process_pool: + self._cleanup_resources() + raise RuntimeError("The model has been loaded. \ + Please uninstall this model first and then reload another model!") + + for device_id in range(self.device_num): + parent_conn, child_conn = Pipe(duplex=True) + self.parent_pipes.append(parent_conn) + + process = Process( + target=infer_process_func, + args=(child_conn, self.model_path, device_id), + daemon=True, + ) + self.process_pool.append(process) + process.start() + + child_conn.close() + + for device_id in range(self.device_num): + try: + msg_type, msg = self.parent_pipes[device_id].recv() + if msg_type == ProcessMessage.ERROR: + raise RuntimeError(f"Device {device_id} initialization failed: {msg}") + if msg_type != ProcessMessage.BUILD_FINISH: + raise RuntimeError(f"Unexpected message type {msg_type} during initialization") + + if self.batch_size not in (-1, msg): + raise RuntimeError("Batch size in different models are inconsistent.") + self.batch_size = msg + logger.info(f"Device {device_id} initialized successfully") + except Exception as e: + self._cleanup_resources() + logger.critical("Failed to initialize inference processes") + raise RuntimeError(f"Unexpected error {e} during initialization") from e + + def infer(self, inputs): + """Performs inference on the provided input data. + + This method distributes the input data across available devices and collects + the results. Each input item is sent to a separate device for parallel processing. + + Args: + inputs: List of input data for inference. Each item will be sent to a separate device. + + Returns: + list: List of inference results from each device. + + Raises: + ValueError: If the number of inputs exceeds the number of available devices. + RuntimeError: If sending prediction command to a device fails or if inference fails. + """ + if self.device_num < len(inputs): + raise ValueError(f"Inputs length {len(inputs)} should be less than or equal to \ + parallel number {self.device_num}, inference abort!") + + for device_id, input_data in enumerate(inputs): + try: + self.parent_pipes[device_id].send((ProcessMessage.PREDICT, input_data)) + except (BrokenPipeError, EOFError, OSError) as e: + self._cleanup_resources() + raise RuntimeError(f"Failed to send PREDICT to device {device_id}: {e}") from e + except Exception as e: + self._cleanup_resources() + raise RuntimeError(f"Unexpected error when sending PREDICT to device {device_id}: {e}") from e + + results = [] + for device_id in range(len(inputs)): + try: + msg_type, msg = self.parent_pipes[device_id].recv() + if msg_type == ProcessMessage.ERROR: + self._cleanup_resources() + raise RuntimeError(f"Device {device_id} inference failed: {msg}, cleanup all resource!") + results.append(msg) + except EOFError as e: + self._cleanup_resources() + raise RuntimeError(f"Device {device_id} connection closed unexpectedly.") from e + + return results + + def finalize(self): + """Finalizes the inference processes and cleans up resources. + + This method sends an EXIT signal to all worker processes, waits for them to + finish, and then cleans up all resources including the processes and pipes. + """ + for idx, pipe in enumerate(self.parent_pipes): + try: + pipe.send((ProcessMessage.EXIT, "")) + except (BrokenPipeError, EOFError, OSError) as e: + logger.exception(f"Failed to send EXIT to device {idx}: {e}") + except Exception as e: + logger.exception(f"Unexpected error when sending EXIT to device {idx}: {e}") + + for idx, process in enumerate(self.process_pool): + if process.is_alive(): + try: + process.join(timeout=5) + except (OSError, RuntimeError) as e: + logger.exception(f"Error while joining process {idx} during finalize: {e}") + except Exception as e: + logger.exception(f"Unexpected error while joining process {idx} during finalize: {e}") + + self._cleanup_resources() + + def _cleanup_resources(self) -> None: + """Cleans up all process and pipe resources. + + This private method terminates all running processes and closes all pipes. + It tracks any failures during cleanup and raises a RuntimeError if cleanup + fails for any of the resources. + + Raises: + RuntimeError: If any process or pipe fails to be cleaned up properly. + """ + failure_processes = [] + for idx, process in enumerate(self.process_pool): + if process.is_alive(): + logger.warning(f"Terminating process {idx}...") + try: + process.terminate() + process.join(timeout=2) + except (OSError, RuntimeError) as e: + logger.exception(f"Failed to terminate process {idx}: {e}") + failure_processes.append(idx) + except Exception as e: + logger.exception(f"Unexpected error while terminating process {idx} during cleanup resources: {e}") + failure_processes.append(idx) + + failure_pipes = [] + for idx, pipe in enumerate(self.parent_pipes): + try: + pipe.close() + except (BrokenPipeError, EOFError, OSError) as e: + logger.exception(f"Failed to close {idx} parent pipe cleanly: {e}") + failure_pipes.append(idx) + except Exception as e: + logger.exception(f"Unexpected error while closing parent pipe {idx}: {e}") + failure_pipes.append(idx) + + self.process_pool = [] + self.parent_pipes = [] + + if failure_processes or failure_pipes: + raise RuntimeError(f"Failed to cleanup subprocesses: {failure_processes}, parent pipes: {failure_pipes}!") + + def is_ready(self): + """Checks if all inference processes are ready and healthy. + + This method verifies that all processes are alive and responsive by sending + a CHECK message to each and waiting for a response. If any process is not + responsive or returns an error, it raises a RuntimeError and cleans up resources. + + Raises: + RuntimeError: If model is not loaded, if any process is not alive, + or if health check fails on any device. + """ + if not self.process_pool: + raise RuntimeError("Model not loaded, please check!") + + for idx, process in enumerate(self.process_pool): + if not process.is_alive(): + self._cleanup_resources() + raise RuntimeError(f"Process {idx} is not alive, please check") + + for idx, pipe in enumerate(self.parent_pipes): + try: + pipe.send((ProcessMessage.CHECK, "")) + except (BrokenPipeError, EOFError, OSError) as e: + self._cleanup_resources() + raise RuntimeError(f"Failed to send CHECK to device {idx}: {e}") from e + except Exception as e: + self._cleanup_resources() + raise RuntimeError(f"Unexpected error when sending CHECK to device {idx}: {e}") from e + + for idx, pipe in enumerate(self.parent_pipes): + try: + msg_type, msg = pipe.recv() + if msg_type == ProcessMessage.ERROR: + self._cleanup_resources() + raise RuntimeError(f"Device {idx} health check failed: {msg}, cleanup all resource!") + except EOFError as e: + self._cleanup_resources() + raise RuntimeError(f"Device {idx} connection closed unexpectedly during health check.") from e + + +class InferenceModel: + """A class for performing inference using MindSpore Lite. + + This class initializes a MindSpore Lite model on a specific device and provides + methods for running inference, dummy inference, and accessing model properties. + + Attributes: + model: MindSpore Lite Model instance. + input_shape_list (list): List of input shapes for the model. + model_batch_size (int): Batch size of the model, extracted from the first input shape. + """ + + def __init__(self, model_path: str, device_id: int): + """Initializes the InferenceModel instance. + + Creates a MindSpore Lite context for the specified device, loads the model + from the given path, and extracts input shapes and batch size information. + + Args: + model_path (str): Path to the MindIR model file. + device_id (int): ID of the device to run inference on (e.g. Ascend NPU ID). + + Raises: + RuntimeError: If the loaded model has no input. + """ + context = mslite.Context() + context.target = ["ascend"] + context.ascend.device_id = device_id + + self.model = mslite.Model() + self.model.build_from_file(model_path, mslite.ModelType.MINDIR, context) + self.input_shape_list = [item.shape for item in self.model.get_inputs()] + if not self.input_shape_list: + raise RuntimeError("The loaded model has no input!") + self.model_batch_size = self.input_shape_list[0][0] + + def infer(self, batch_inputs: List[np.ndarray]): + """Performs inference on the provided batch of input data. + + This method takes a list of numpy arrays as input, assigns them to the model's + input tensors, runs the prediction, and returns the output as a list of numpy arrays. + + Args: + batch_inputs (List[np.ndarray]): List of input data as numpy arrays. The number + of inputs should match the number of model inputs. + + Returns: + list: List of inference results as numpy arrays. + + Raises: + ValueError: If the number of inputs doesn't match the number of model inputs. + """ + inputs = self.model.get_inputs() + if len(batch_inputs) != len(inputs): + raise ValueError(f"The number of model inputs {len(inputs)} should be the same as \ + the number of inputs {len(batch_inputs)}") + + for i, input_ in enumerate(inputs): + input_.set_data_from_numpy(batch_inputs[i]) + + batch_out = self.model.predict(inputs) + return [item.get_data_to_numpy() for item in batch_out] + + def dummy_infer(self): + """Performs a dummy inference without input data. + + This method runs the model prediction without providing specific input data, + which is typically used for model warm-up or health checks. + + Returns: + list: Model outputs from the prediction. + """ + inputs = self.model.get_inputs() + outputs = self.model.predict(inputs) + return outputs + + @property + def batch_size(self): + """int: The batch size of the model, extracted from the first input shape.""" + return self.model_batch_size + + @property + def input_shapes(self): + """list: List of input shapes for the model.""" + return self.input_shape_list diff --git a/MindEnergy/applications/deploy/src/schemas.py b/MindEnergy/applications/deploy/src/schemas.py new file mode 100644 index 0000000000000000000000000000000000000000..3bc2a0e0ed286ef7b8ec3c7514a8057210828f28 --- /dev/null +++ b/MindEnergy/applications/deploy/src/schemas.py @@ -0,0 +1,42 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Schemas module for MindScience deployment service. + +This module defines Pydantic data models (schemas) used for request/response +validation and data serialization in the MindScience deployment service. +Currently includes: + +- ModelInfo: A schema for representing model status and associated messages + returned by the deployment service APIs + +These schemas ensure type safety and proper validation of data exchanged +between clients and the deployment service. +""" + +from pydantic import BaseModel + +from .enums import ModelStatus + +class ModelInfo(BaseModel): + """Model information containing status and message. + + Attributes: + status: The status of the model, defaults to ModelStatus.SUCCESS. + message: The message associated with the model status, defaults to empty string. + """ + status: str = ModelStatus.SUCCESS + message: str = "" diff --git a/MindEnergy/applications/deploy/src/session.py b/MindEnergy/applications/deploy/src/session.py new file mode 100644 index 0000000000000000000000000000000000000000..db7316e1776078b6464d57ba425a445790897751 --- /dev/null +++ b/MindEnergy/applications/deploy/src/session.py @@ -0,0 +1,219 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Session management module for MindScience deployment service. + +This module provides the SessionManager class for managing model inference sessions. +It includes functionality for: + +- Loading and initializing models for inference +- Managing multiple inference sessions across different devices +- Performing batch inference operations +- Checking model health status +- Unloading models and cleaning up resources + +The SessionManager handles the lifecycle of model sessions, from initialization +to finalization, ensuring proper resource management and inference execution. +""" + +import numpy as np +from loguru import logger + +from .utils import Utilities +from .enums import HealthStatus +from .config import DeployConfig +from .inference import MultiprocessInference + +class SessionManager: + """Manages model sessions for inference tasks. + + This class handles the initialization, deletion, and inference operations + for machine learning models, including loading models, managing sessions, + and performing batch inference. + """ + + def __init__(self): + """Initializes the SessionManager with default values.""" + self.current_model = "" + self.device_num = -1 + self.infer_sessions = None + self._model_dict = {} + + def init_session(self, model_name, device_num): + """Initializes a model session for inference. + + Collects model files, validates device number and model name, + creates inference sessions, and builds the model for each session. + + Args: + model_name (str): Name of the model to load. + device_num (int): Number of devices to use for inference. + + Raises: + ValueError: If device number exceeds maximum, model is not found, + or model is already loaded. + RuntimeError: If session initialization fails. + """ + self._model_dict = Utilities.collect_mindir_models(DeployConfig.models_dir) + + if device_num > DeployConfig.max_device_num: + raise ValueError(f"Device num {device_num} is over the actual device num {DeployConfig.max_device_num}, \ + please check!") + if model_name not in self.model_dict: + raise ValueError(f"model {model_name} is not in {self.model_dict.keys()}, please check!") + if model_name == self.current_model: + raise ValueError(f"model {model_name} is already loaded, please check!") + + model_paths = self.model_dict.get(model_name) + sessions = [MultiprocessInference(model_path, device_num) for model_path in model_paths] + try: + for i, session in enumerate(sessions): + session.build_model() + logger.info(f"{model_paths[i]} is loaded successfully, {device_num} sessions are inited") + except Exception as e: + raise RuntimeError(f"Init session failed, please check! ERROR: {e}") from e + + self.current_model = model_name + self.device_num = device_num + self.infer_sessions = sessions + + def del_session(self): + """Deletes the current model session. + + Cleans up the current model, device number, and model dictionary. + Finalizes all inference sessions if they exist. + + Raises: + RuntimeError: If deleting the session fails. + """ + self.current_model = "" + self.device_num = -1 + self._model_dict = {} + + if self.infer_sessions is None: + logger.warning("Session not inited, please check!") + else: + try: + for session in self.infer_sessions: + session.finalize() + self.infer_sessions = None + logger.info("Session is deleted successfully") + except Exception as e: + raise RuntimeError(f"Delete session failed, please check! ERROR: {e}") from e + + def batch_infer(self, batch_inputs, task_type): + """Performs batch inference on the given inputs. + + Processes batch inputs by extending them to match model batch size, + and performs inference using the specified task type session. + + Args: + batch_inputs (list): List of input arrays for batch inference. + task_type (int): Index of the inference session to use. + + Returns: + list: List of concatenated results from the inference. + + Raises: + ValueError: If inputs are empty, task type is invalid, or + model has not been loaded. + RuntimeError: If model has not been loaded. + """ + if not batch_inputs: + raise ValueError("Input is empty, please check!") + if self.infer_sessions is None: + raise RuntimeError("Model has not been loaded, please check!") + if task_type >= len(self.infer_sessions): + raise ValueError(f"Only task_type {list(range(len(self.infer_sessions)))} is supported, \ + but given {task_type}") + + session = self.infer_sessions[task_type] + data_size = batch_inputs[0].shape[0] + model_batch_size = session.batch_size + + batch_inputs = [Utilities.extend_input(item, model_batch_size) for item in batch_inputs] + total_size = batch_inputs[0].shape[0] + + input_datas = [] + result_list = [] + for i in range(0, total_size, model_batch_size): + input_datas.append([item[i: i + model_batch_size] for item in batch_inputs]) + if len(input_datas) > 0 and len(input_datas) % self.device_num == 0: + result_list = self._infer(session, input_datas, result_list) + input_datas = [] + if input_datas: + result_list = self._infer(session, input_datas, result_list) + + model_out_num = len(result_list[0]) + final_ret = [] + for i in range(model_out_num): + final_ret.append(np.concatenate([result[i] for result in result_list], axis=0)[:data_size]) + return final_ret + + def _infer(self, session, input_datas, result_list): + """Performs inference on input data using the given session. + + Internal helper method that executes the actual inference and + extends the result list with predicted results. + + Args: + session (MultiprocessInference): Inference session to use. + input_datas (list): List of input data to be inferred. + result_list (list): List to extend with inference results. + + Returns: + list: Updated result list with new inference results. + + Raises: + RuntimeError: If the model prediction fails. + """ + try: + predict_result = session.infer(input_datas) + result_list.extend(predict_result) + except (ValueError, RuntimeError) as e: + raise RuntimeError(f"{self.current_model} predict failed, please check! ERROR: {e}") from e + return result_list + + def is_ready(self): + """Checks if the model sessions are ready for inference. + + Verifies if inference sessions are initialized and operational. + + Returns: + HealthStatus: Status indicating if model is NOTLOADED, READY, + or in EXCEPTION state. + """ + if self.infer_sessions is None: + logger.info("Model has not been loaded, please check!") + return HealthStatus.NOTLOADED + try: + for session in self.infer_sessions: + session.is_ready() + logger.info(f"Model: {self.current_model} is ready!") + return HealthStatus.READY + except Exception: + logger.info(f"Model: {self.current_model} is unavailable!") + return HealthStatus.EXCEPTION + + @property + def session_name(self): + """str: Name of the currently loaded model.""" + return self.current_model + + @property + def model_dict(self): + """dict: Dictionary containing model information.""" + return self._model_dict diff --git a/MindEnergy/applications/deploy/src/utils.py b/MindEnergy/applications/deploy/src/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..6bb225e181d8d725ebe263bba1d26e18316b8593 --- /dev/null +++ b/MindEnergy/applications/deploy/src/utils.py @@ -0,0 +1,369 @@ +# Copyright 2025 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Utilities module for MindScience deployment service. + +This module provides a collection of utility functions and classes for various +tasks in the MindScience deployment service, including: + +- File handling operations (upload, extraction, HDF5 read/write) +- Model file collection and management +- Input data extension for batch processing +- Task status counting +- NPU (Neural Processing Unit) resource monitoring +- Asynchronous operations support + +The Utilities class contains static methods that are commonly used across +different components of the deployment service. +""" + +import os +import re +import zipfile +import tarfile +import asyncio +import traceback +import subprocess +from typing import Dict, List + +import h5py +import aiofiles +import numpy as np +from loguru import logger +from fastapi import UploadFile, HTTPException + +from .enums import TaskStatus +from .config import DeployConfig, ModelConfig + +class Utilities: + """ + Utilities helper class that encapsulates deployment-related utility functions. + + This class includes file handling, model collection, HDF5 file read/write, + asynchronous file upload saving, and methods to collect NPU usage information. + """ + @staticmethod + def collect_mindir_models(root_dir: str) -> Dict[str, List[str]]: + """Collect *.mindir model files from each subdirectory under the specified root directory. + + Args: + root_dir (str): Root directory path to search. Each immediate subdirectory + is treated as a distinct model name. + + Returns: + Dict[str, List[str]]: Mapping of model directory name to a sorted list of + absolute paths of all `.mindir` files found in that model directory. + + Raises: + ValueError: If `root_dir` is not a valid directory. + """ + if not os.path.isdir(root_dir): + raise ValueError(f"Invalid directory: {root_dir}") + + model_dict = {} + for model_name in os.listdir(root_dir): + model_dir = os.path.join(root_dir, model_name) + if not os.path.isdir(model_dir): + continue + + mindir_files = [] + for file in os.listdir(model_dir): + if file.endswith(".mindir"): + abs_path = os.path.abspath(os.path.join(model_dir, file)) + mindir_files.append(abs_path) + + if mindir_files: + model_dict[model_name] = sorted(mindir_files) + + return model_dict + + @staticmethod + def extend_input(array: np.ndarray, batch_size: int) -> np.ndarray: + """Extend (pad) the input array along the first dimension so that its length + is divisible by `batch_size`. + + If the first dimension is already divisible by `batch_size`, the original + array is returned. Otherwise, zero-rows are appended to the end of the + array to make the first dimension divisible. + + Args: + array (np.ndarray): The numpy array to pad. Must have at least one dimension. + batch_size (int): The batch size to make the first dimension divisible by. + + Returns: + np.ndarray: The padded numpy array with the same dtype as the input. + + Raises: + ValueError: If `array` has no shape (empty). + """ + if not array.shape: + raise ValueError(f"The array to extend is empty, please check: {array}.") + + if array.shape[0] % batch_size != 0: + padding_length = batch_size - array.shape[0] % batch_size + padding_contents = np.zeros((padding_length, *array.shape[1:]), dtype=array.dtype) + return np.concatenate([array, padding_contents], axis=0) + return array + + @staticmethod + async def save_upload_file(upload_file: UploadFile, destination: str): + """Asynchronously save a FastAPI `UploadFile` to the specified destination + path in chunks. + + Args: + upload_file (UploadFile): The uploaded file object provided by FastAPI. + destination (str): Target file path (including filename) to save the upload. + + Returns: + None + + Raises: + HTTPException: Raises an HTTPException (status 500) if saving fails. + """ + try: + async with aiofiles.open(destination, "wb") as f: + while chunk := await upload_file.read(DeployConfig.chunk_size): + await f.write(chunk) + except Exception as e: + raise HTTPException(status_code=500, detail=f"save upload file failed: {e}") from e + + @staticmethod + def extract_file(file_path: str, extract_to: str = None): + """Extract supported archive file formats (tar, tar.gz, tgz, tar.bz2, tbz2, + tar.xz, txz, zip) to a target directory. + + Args: + file_path (str): Path to the archive file to extract. + extract_to (str, optional): Target directory to extract into. If None, + the archive's containing directory is used. + + Returns: + None + + Raises: + FileNotFoundError: If `file_path` does not exist. + ValueError: If the file format is not a supported archive type. + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + if extract_to is None: + extract_to = os.path.dirname(file_path) + + if file_path.endswith((".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")): + with tarfile.open(file_path, 'r:*') as tf: + tf.extractall(extract_to) + elif file_path.endswith(".zip"): + with zipfile.ZipFile(file_path, 'r') as zf: + zf.extractall(extract_to) + else: + raise ValueError(f"Unsupported compressed file format: {file_path}.") + + @staticmethod + def count_pending_task(tasks_status: Dict[str, str]) -> int: + """Count the number of tasks in a mapping that are in the PENDING state. + + Args: + tasks_status (Dict[str, str]): Mapping from task ID to task status + (expected to be members of `TaskStatus`). + + Returns: + int: Number of tasks whose status equals `TaskStatus.PENDING`. + """ + pending_number = 0 + for value in tasks_status.values(): + if value == TaskStatus.PENDING: + pending_number += 1 + return pending_number + + @staticmethod + def load_h5_file(file_path: str) -> List[np.ndarray]: + """Load input columns specified in `ModelConfig.input_columns` from an HDF5 + file and return them as a list of numpy arrays. + + Args: + file_path (str): HDF5 file path to read from. + + Returns: + List[np.ndarray]: List of numpy arrays for each input column defined in + `ModelConfig.input_columns`, preserving the same order. + + Raises: + ValueError: If any key from `ModelConfig.input_columns` is missing in the file. + """ + batch_inputs = [] + with h5py.File(file_path, "r") as f: + for key in ModelConfig.input_columns: + if key not in f.keys(): + raise ValueError(f"Key {key} not in dataset, please check!") + batch_inputs.append(np.array(f[key], dtype=f[key].dtype)) + return batch_inputs + + @staticmethod + def save_h5_file(items: List[np.ndarray], file_path: str): + """Write prediction outputs to an HDF5 file. + + Output column names are taken from `ModelConfig.output_columns`. If the + number of `items` does not match the number of configured output column + names, the function will rename columns using the default format + `output_0`, `output_1`, ... and log a warning. + + Args: + items (List[np.ndarray]): List of numpy arrays to write as outputs. + file_path (str): HDF5 output file path. + + Returns: + None + + Notes: + If the number of configured output columns does not match the number + of provided items, the method assigns default names and logs a + warning. + """ + output_columns = ModelConfig.output_columns + if len(items) != len(output_columns): + logger.warning("Number of outputs in config is inconsistent with the number of the actual outputs, \ + rename the output column.") + output_columns = tuple(f"output_{i}" for i in range(len(items))) + + with h5py.File(file_path, "w") as f: + for key, value in zip(output_columns, items): + f.create_dataset(key, data=value) + + @staticmethod + async def get_npu_usage(): + """Asynchronously execute `npu-smi info` to obtain NPU utilization and memory + information, parse the output, and return a structured list of stats. + + Each returned item is a dictionary with the following keys: + - id (int): NPU device ID + - utilization_percent (int): NPU utilization percentage + - memory_used_mb (int): Memory used in MB + - memory_total_mb (int): Total memory in MB + + Returns: + List[Dict[str, int]]: List of dictionary entries containing NPU stats. + + Raises: + FileNotFoundError: If the `npu-smi` command is not available. + RuntimeError: If executing `npu-smi info` fails for other reasons. + """ + try: + command = ["npu-smi", "info"] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + output_bytes, stderr_bytes = await process.communicate() + + if process.returncode != 0: + if b"command not found" in stderr_bytes.lower() or \ + b"no such file or directory" in stderr_bytes.lower(): + logger.error("ERROR: 'npu-smi' command not found!") + logger.error("Please ensure that the Ascend CANN toolkit is correctly installed and \ + its bin directory has been added to the system's PATH environment variable.") + raise FileNotFoundError("'npu-smi' command not found.") + raise subprocess.CalledProcessError( + process.returncode, command, output=output_bytes, stderr=stderr_bytes + ) + except Exception as e: + logger.error(f"Execute 'npu-smi info' failed, ERROR: {e}") + raise RuntimeError(f"Execute 'npu-smi info' failed, ERROR: {e}") from e + + logger.info("=== npu-smi info original output ===") + logger.info(output_bytes.decode("utf-8", errors="replace")) + logger.info("===========================") + + output = output_bytes.decode("utf-8", errors="ignore") + + npu_stats = [] + lines = output.strip().splitlines() + + if not lines: + logger.warning("'npu-smi info' did not return any output.") + return npu_stats + + for i, line in enumerate(lines): + match_first_line = re.match(r"^\s*\|\s*(\d+)\s+[a-zA-Z0-9]+\s*\|", line) + + if not match_first_line: + logger.warning(f"Can not match NPU ID in line: {line}.") + continue + + try: + npu_id_str = match_first_line.group(1) + if not npu_id_str: + logger.warning(f"NPU ID is empty in line: {line}.") + continue + + npu_id = int(npu_id_str) + logger.info(f"Found NPU ID: {npu_id}.") + + if (i + 1) >= len(lines): + logger.warning("Not enough rows to parse NPU utilization information.") + continue + + second_line = lines[i + 1] + logger.info(f"Process NPU {npu_id} data line: {second_line}.") + + parts = second_line.split("|") + if len(parts) < 4: + logger.warning(f"The data row format is incorrect and \ + the number of columns is insufficient: {second_line}.") + continue + + data_string = parts[-2].strip() + logger.info(f"Extracted data: '{data_string}'.") + + tokens = re.findall(r"\d+", data_string) + logger.info(f"Extracted tokens: {tokens}.") + + if len(tokens) < 5: + logger.warning(f"Insufficient tokens: {tokens}.") + continue + + try: + utilization_percent = int(tokens[0]) + mem_used = int(tokens[3]) + mem_total = int(tokens[4]) + + info = { + "id": npu_id, + "utilization_percent": utilization_percent, + "memory_used_mb": mem_used, + "memory_total_mb": mem_total, + } + logger.info(f"Successfully parse {npu_id} info: {info}.") + npu_stats.append(info) + + except (ValueError, IndexError) as e: + logger.error(f"Error parsing NPU {npu_id} data: {e}.") + continue + + except Exception as e: + logger.exception(f"Unexpected error when parsing NPU data: {e}.") + traceback.print_exc() + continue + + if not npu_stats: + logger.warning(f"Can not parse any info from 'npu-smi info' output. original output: \n{output}") + if "No NPU device found" in output: + logger.info("'No NPU device found' - Perhaps there are no available NPU devices.") + else: + logger.info(f"Successfully parse {len(npu_stats)} NPU info.") + + return npu_stats