diff --git a/MindEnergy/applications/deploy/EXAMPLE.md b/MindEnergy/applications/deploy/EXAMPLE.md
new file mode 100644
index 0000000000000000000000000000000000000000..ac73c22238786fa7c422acbc1710c8e3f35078d6
--- /dev/null
+++ b/MindEnergy/applications/deploy/EXAMPLE.md
@@ -0,0 +1,98 @@
+# MindScience 部署服务 - API 示例
+
+本文档提供如何使用 curl 命令调用 MindScience 部署服务 API 的示例。
+
+## 1. 加载模型
+
+使用 curl 加载模型:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \
+ -H "Content-Type: multipart/form-data" \
+ -F "model_name=your_model" \
+ -F "model_file=@/path/to/your/model_file.zip"
+```
+
+`model_file.zip` 的目录格式为:
+
+```bash
+model_file.zip
+├── your_model_1.mindir
+├── your_model_2.mindir
+├── ...
+└── your_model_n.mindir
+```
+
+如果只想从本地文件加载模型(不上传),只需提供 `model_name`:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \
+ -H "Content-Type: multipart/form-data" \
+ -F "model_name=your_local_model"
+```
+
+## 2. 卸载模型
+
+卸载当前已加载的模型:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/unload_model"
+```
+
+## 3. 推理
+
+对数据集执行推理,`task_type` 指定选择 `model_file.zip` 中的哪个模型进行推理,其取值需小于 `model_file.zip` 中的模型数量:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/infer" \
+ -H "Content-Type: multipart/form-data" \
+ -F "dataset=@/path/to/your/dataset.h5" \
+ -F "task_type=0"
+```
+
+这将返回一个 task_id,可用于检查推理请求的状态。
+
+## 4. 任务状态查询
+
+检查推理任务的状态(将 {task_id} 替换为从推理API返回的实际任务 ID):
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_status/{task_id}"
+```
+
+例如:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_status/123e4567-e89b-12d3-a456-426614174000"
+```
+
+## 5. 结果下载
+
+下载已完成的推理任务的结果(将 {task_id} 替换为实际的任务 ID):
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_results/{task_id}" -o "results.h5"
+```
+
+`-o` 标志将响应保存为指定文件名的文件。例如:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_results/123e4567-e89b-12d3-a456-426614174000" -o "results.h5"
+```
+
+这将下载名为 `results.h5` 的结果文件。
+
+## 6. 健康检查
+
+检查部署服务的健康状态:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/health_check"
+```
+
+## 重要说明
+
+- 根据 ServerConfig,默认端口可能是 8001,但应检查 `src/config.py` 文件以确认确切端口。
+- 发出请求之前,请确保部署服务正在运行。
+- 模型推理任务在后台异步处理,因此在尝试下载结果之前需要检查任务状态。
+- 对于并发请求数有限制(在 DeployConfig 中配置)。
diff --git a/MindEnergy/applications/deploy/EXAMPLE_EN.md b/MindEnergy/applications/deploy/EXAMPLE_EN.md
new file mode 100644
index 0000000000000000000000000000000000000000..f2a416799700e8f8f9bd7d4393f0536a80f357c1
--- /dev/null
+++ b/MindEnergy/applications/deploy/EXAMPLE_EN.md
@@ -0,0 +1,98 @@
+# MindScience Deployment Service - API Examples
+
+This document provides examples of how to use curl commands to call the MindScience deployment service APIs.
+
+## 1. Load Model
+
+Use curl to load a model:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \
+ -H "Content-Type: multipart/form-data" \
+ -F "model_name=your_model" \
+ -F "model_file=@/path/to/your/model_file.zip"
+```
+
+The directory structure of `model_file.zip` should be:
+
+```bash
+model_file.zip
+├── your_model_1.mindir
+├── your_model_2.mindir
+├── ...
+└── your_model_n.mindir
+```
+
+If you only want to load a model from local files (without uploading), just provide `model_name`:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/load_model" \
+ -H "Content-Type: multipart/form-data" \
+ -F "model_name=your_local_model"
+```
+
+## 2. Unload Model
+
+Unload the currently loaded model:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/unload_model"
+```
+
+## 3. Inference
+
+Perform inference on a dataset. `task_type` specifies which model in `model_file.zip` to use for inference, and its value should be less than the number of models in `model_file.zip`:
+
+```bash
+curl -X POST "http://localhost:8001/mindscience/deploy/infer" \
+ -H "Content-Type: multipart/form-data" \
+ -F "dataset=@/path/to/your/dataset.h5" \
+ -F "task_type=0"
+```
+
+This will return a task_id that can be used to check the status of the inference request.
+
+## 4. Task Status Query
+
+Check the status of an inference task (replace {task_id} with the actual task ID returned from the inference API):
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_status/{task_id}"
+```
+
+For example:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_status/123e4567-e89b-12d3-a456-426614174000"
+```
+
+## 5. Result Download
+
+Download the results of a completed inference task (replace {task_id} with the actual task ID):
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_results/{task_id}" -o "results.h5"
+```
+
+The `-o` flag saves the response to a file with the specified filename. For example:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/query_results/123e4567-e89b-12d3-a456-426614174000" -o "results.h5"
+```
+
+This will download the result file named `results.h5`.
+
+## 6. Health Check
+
+Check the health status of the deployment service:
+
+```bash
+curl -X GET "http://localhost:8001/mindscience/deploy/health_check"
+```
+
+## Important Notes
+
+- According to ServerConfig, the default port might be 8001, but you should check the `src/config.py` file to confirm the exact port.
+- Ensure the deployment service is running before making requests.
+- Model inference tasks are processed asynchronously in the background, so check the task status before attempting to download results.
+- There are limits on the number of concurrent requests (configured in DeployConfig).
diff --git a/MindEnergy/applications/deploy/README.md b/MindEnergy/applications/deploy/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..b3643879d6d3ca811453582ef379b3e96ef8c938
--- /dev/null
+++ b/MindEnergy/applications/deploy/README.md
@@ -0,0 +1,141 @@
+# MindScience 部署服务
+
+MindScience 部署服务是一个基于 FastAPI 的模型部署和监控系统,支持多设备并行推理和资源监控功能。
+
+## 架构图
+
+
+
+
+
+## 目录结构
+
+```shell
+deploy/
+├── deploy.py # 模型部署服务主文件
+├── monitor.py # 服务器监控服务主文件
+├── requirements.txt # 项目依赖
+└── src/ # 源代码目录
+ ├── config.py # 配置文件
+ ├── enums.py # 枚举定义
+ ├── inference.py # 推理实现
+ ├── schemas.py # 数据模型定义
+ ├── session.py # 会话管理
+ └── utils.py # 工具函数
+```
+
+## 功能特性
+
+### 部署服务 (deploy.py)
+
+- **模型加载/卸载**:支持通过 HTTP 接口上传 MindIR 模型文件并加载到设备上
+- **异步推理**:支持后台异步执行推理任务
+- **任务状态管理**:支持任务状态查询(待处理、处理中、已完成、错误)
+- **健康检查**:提供模型状态检查接口
+- **结果下载**:推理完成后可下载结果文件
+- **多设备支持**:支持最多 8 个 NPU 设备并行推理
+
+### 监控服务 (monitor.py)
+
+- **资源监控**:实时监控 CPU、内存和 NPU 使用率
+- **健康检查**:提供服务器资源使用情况查询接口
+
+## 配置参数
+
+### 部署配置 (DeployConfig)
+
+- `max_device_num`: 最大设备数量(默认 8)
+- `deploy_device_num`: 部署使用的设备数量(默认 8)
+- `max_request_num`: 最大并发请求数(默认 100)
+- `models_dir`: 模型文件存储目录(默认 "models")
+- `datasets_dir`: 数据集文件存储目录(默认 "datasets")
+- `results_dir`: 结果文件存储目录(默认 "results")
+- `dummy_model_path`: 用于测试的虚拟模型路径(默认 "dummy_model.mindir")
+- `chunk_size`: 数据块大小(默认 8MB)
+
+### 服务器配置 (ServerConfig)
+
+- `host`: 服务器主机地址(默认 "127.0.0.1")
+- `deploy_port`: 部署服务端口(默认 8001)
+- `monitor_port`: 监控服务端口(默认 8002)
+- `limit_concurrency`: 最大并发连接数(默认 1000)
+- `timeout_keep_alive`: Keep-alive 连接超时时间(默认 30 秒)
+- `backlog`: 待处理连接队列大小(默认 2048)
+
+## API 接口
+
+### 部署服务接口
+
+- `POST /mindscience/deploy/load_model` - 加载模型
+ - 参数:model_name (表单), model_file (可选文件)
+
+- `POST /mindscience/deploy/unload_model` - 卸载模型
+
+- `POST /mindscience/deploy/infer` - 执行推理
+ - 参数:dataset (文件), task_type (表单)
+
+- `GET /mindscience/deploy/query_status/{task_id}` - 查询任务状态
+
+- `GET /mindscience/deploy/query_results/{task_id}` - 获取推理结果
+
+- `GET /mindscience/deploy/health_check` - 健康检查
+
+### 监控服务接口
+
+- `GET /mindscience/monitor/resource_usage` - 获取资源使用情况
+ - 返回:CPU使用率、内存使用率、NPU使用率、NPU内存使用率
+
+## 依赖项
+
+- `fastapi == 0.121.2`: Web 框架
+- `uvicorn == 0.38.0`: ASGI 服务器
+- `python-multipart == 0.0.20`: 多部分表单数据处理
+- `h5py == 3.14.0`: HDF5 文件处理
+- `loguru == 0.7.3`: 日志处理
+- `aiofiles == 25.1.0`: 异步文件操作
+- `psutil == 7.0.0`: 系统和进程监控
+- `numpy == 1.26.4`: 科学计算库
+- `mindspore_lite == 2.7.1`: MindSpore Lite 推理引擎
+- `CANN == 8.2.RC1`: 神经网络异构计算架构
+
+## 安装和使用
+
+1. 参考[CANN官网](https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/83RC1/softwareinst/instg/instg_quick.html?Mode=PmIns&InstallType=local&OS=openEuler&Software=cannToolKit)文档安装CANN社区版软件包。
+
+2. 从[MindSpore官网](https://www.mindspore.cn/lite/docs/zh-CN/r2.7.1/use/downloads.html#mindspore-lite-python%E6%8E%A5%E5%8F%A3%E5%BC%80%E5%8F%91%E5%BA%93)下载MindSpore Lite Python接口开发库:
+
+ ```bash
+ wget https://ms-release.obs.cn-north-4.myhuaweicloud.com/2.7.1/MindSporeLite/lite/release/linux/aarch64/cloud_fusion/python310/mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl
+
+ pip install mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl
+ ```
+
+3. 安装 Python 依赖:
+
+ ```bash
+ pip install -r requirements.txt
+ ```
+
+4. 根据实际业务修改src/config.py配置文件中的配置项。
+
+5. 启动部署服务:
+
+ ```bash
+ python deploy.py
+ ```
+
+6. 启动监控服务:
+
+ ```bash
+ python monitor.py
+ ```
+
+## 技术架构
+
+- **框架**:FastAPI + Uvicorn
+- **推理引擎**:MindSpore Lite
+- **设备支持**:华为昇腾 NPU
+- **并行处理**:多进程并行推理
+- **数据格式**:HDF5 数据存储
+
+系统采用异步处理方式,支持高并发推理请求,并提供完整的任务生命周期管理。
diff --git a/MindEnergy/applications/deploy/README_EN.md b/MindEnergy/applications/deploy/README_EN.md
new file mode 100644
index 0000000000000000000000000000000000000000..00260073c4273d690bc7693ac920836430a8f339
--- /dev/null
+++ b/MindEnergy/applications/deploy/README_EN.md
@@ -0,0 +1,141 @@
+# MindScience Deployment Service
+
+MindScience Deployment Service is a model deployment and monitoring system based on FastAPI, supporting multi-device parallel inference and resource monitoring capabilities.
+
+## Architecture
+
+
+
+
+
+## Directory Structure
+
+```shell
+deploy/
+├── deploy.py # Model deployment service main file
+├── monitor.py # Server monitoring service main file
+├── requirements.txt # Project dependencies
+└── src/ # Source code directory
+ ├── config.py # Configuration file
+ ├── enums.py # Enum definitions
+ ├── inference.py # Inference implementation
+ ├── schemas.py # Data model definitions
+ ├── session.py # Session management
+ └── utils.py # Utility functions
+```
+
+## Features
+
+### Deployment Service (deploy.py)
+
+- **Model load/unload**: Supports uploading MindIR model files via HTTP interface and loading them to devices
+- **Asynchronous inference**: Supports background asynchronous execution of inference tasks
+- **Task status management**: Supports task status queries (pending, processing, completed, error)
+- **Health check**: Provides model status checking interface
+- **Result download**: Results file can be downloaded after inference completion
+- **Multi-device support**: Supports up to 8 NPU devices for parallel inference
+
+### Monitoring Service (monitor.py)
+
+- **Resource monitoring**: Real-time monitoring of CPU, memory and NPU usage
+- **Health check**: Provides server resource usage query interface
+
+## Configuration Parameters
+
+### Deployment Configuration (DeployConfig)
+
+- `max_device_num`: Maximum number of devices (default 8)
+- `deploy_device_num`: Number of devices used for deployment (default 8)
+- `max_request_num`: Maximum concurrent request number (default 100)
+- `models_dir`: Model file storage directory (default "models")
+- `datasets_dir`: Dataset file storage directory (default "datasets")
+- `results_dir`: Result file storage directory (default "results")
+- `dummy_model_path`: Path for dummy model used in testing (default "dummy_model.mindir")
+- `chunk_size`: Chunk size (default 8MB)
+
+### Server Configuration (ServerConfig)
+
+- `host`: Server host address (default "127.0.0.1")
+- `deploy_port`: Deployment service port (default 8001)
+- `monitor_port`: Monitoring service port (default 8002)
+- `limit_concurrency`: Maximum concurrent connection number (default 1000)
+- `timeout_keep_alive`: Keep-alive connection timeout (default 30 seconds)
+- `backlog`: Pending connection queue size (default 2048)
+
+## API Interfaces
+
+### Deployment Service Interfaces
+
+- `POST /mindscience/deploy/load_model` - Load model
+ - Parameters: model_name (form), model_file (optional file)
+
+- `POST /mindscience/deploy/unload_model` - Unload model
+
+- `POST /mindscience/deploy/infer` - Execute inference
+ - Parameters: dataset (file), task_type (form)
+
+- `GET /mindscience/deploy/query_status/{task_id}` - Query task status
+
+- `GET /mindscience/deploy/query_results/{task_id}` - Get inference results
+
+- `GET /mindscience/deploy/health_check` - Health check
+
+### Monitoring Service Interface
+
+- `GET /mindscience/monitor/resource_usage` - Get resource usage
+ - Returns: CPU usage rate, memory usage rate, NPU usage rate, NPU memory usage rate
+
+## Dependencies
+
+- `fastapi == 0.121.2`: Web framework
+- `uvicorn == 0.38.0`: ASGI server
+- `python-multipart == 0.0.20`: Multipart form data processing
+- `h5py == 3.14.0`: HDF5 file processing
+- `loguru == 0.7.3`: Logging
+- `aiofiles == 25.1.0`: Asynchronous file operations
+- `psutil == 7.0.0`: System and process monitoring
+- `numpy == 1.26.4`: Scientific computing library
+- `mindspore_lite == 2.7.1`: MindSpore Lite inference engine
+- `CANN == 8.2.RC1`: Compute Architecture for Neural Networks
+
+## Installation and Usage
+
+1. Refer to the [CANN official website](https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/83RC1/softwareinst/instg/instg_quick.html?Mode=PmIns&InstallType=local&OS=openEuler&Software=cannToolKit) documentation to install the CANN community edition software package.
+
+2. Download MindSpore Lite Python API development library from the [MindSpore official website](https://www.mindspore.cn/lite/docs/zh-CN/r2.7.1/use/downloads.html#mindspore-lite-python%E6%8E%A5%E5%8F%A3%E5%BC%80%E5%8F%91%E5%BA%93)
+
+ ```bash
+ wget https://ms-release.obs.cn-north-4.myhuaweicloud.com/2.7.1/MindSporeLite/lite/release/linux/aarch64/cloud_fusion/python310/mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl
+
+ pip install mindspore_lite-2.7.1-cp310-cp310-linux_aarch64.whl
+ ```
+
+3. Install Python dependencies:
+
+ ```bash
+ pip install -r requirements.txt
+ ```
+
+4. Modify the configuration items in the src/config.py file based on actual business needs.
+
+5. Start deployment service:
+
+ ```bash
+ python deploy.py
+ ```
+
+6. Start monitoring service:
+
+ ```bash
+ python monitor.py
+ ```
+
+## Technical Architecture
+
+- **Framework**: FastAPI + Uvicorn
+- **Inference engine**: MindSpore Lite
+- **Device support**: Huawei Ascend NPU
+- **Parallel processing**: Multi-process parallel inference
+- **Data format**: HDF5 data storage
+
+The system adopts an asynchronous processing approach, supports high-concurrency inference requests, and provides complete task lifecycle management.
diff --git a/MindEnergy/applications/deploy/deploy.py b/MindEnergy/applications/deploy/deploy.py
new file mode 100644
index 0000000000000000000000000000000000000000..abebc09c3c7ea384d8c1fb45363b870806828e31
--- /dev/null
+++ b/MindEnergy/applications/deploy/deploy.py
@@ -0,0 +1,374 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+MindScience Deployment Service.
+
+This module implements a model deployment service based on FastAPI that supports:
+- Model loading/unloading via HTTP interface
+- Asynchronous inference execution with background tasks
+- Task status management (pending, processing, completed, error)
+- Health checking functionality
+- Multi-device parallel inference support (up to 8 NPU devices)
+- Result file download after inference completion
+
+The service provides RESTful APIs for model management and inference execution,
+with proper error handling and resource management.
+"""
+
+import os
+import uuid
+import signal
+from typing import Any, Union
+from contextlib import asynccontextmanager
+
+from loguru import logger
+from uvicorn import Server
+from uvicorn.config import Config
+from fastapi import FastAPI, Form, File, UploadFile, BackgroundTasks, HTTPException
+from fastapi.responses import JSONResponse, FileResponse
+from fastapi.middleware.cors import CORSMiddleware
+
+from src.enums import HealthStatus, ModelStatus, TaskStatus
+from src.schemas import ModelInfo
+from src.session import SessionManager
+from src.utils import Utilities
+from src.config import configure_logging, DeployConfig, ServerConfig
+
+# pylint: disable=unused-argument, redefined-outer-name
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manages the application lifespan by initializing and cleaning up resources.
+
+ This function is used as an async context manager for the FastAPI application
+ lifespan. It handles initialization of models on the specified devices during
+ startup and performs cleanup during shutdown.
+
+ Args:
+ app (FastAPI): The FastAPI application instance.
+ """
+ configure_logging("deploy")
+ logger.info(f"Initializing models on devices {list(range(DeployConfig.deploy_device_num))}.")
+ yield
+ logger.info("Shutting down deploy service.")
+
+app = FastAPI(lifespan=lifespan)
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_methods=["*"]
+)
+
+model = SessionManager()
+tasks_status = {}
+
+
+def inference(dataset_path, task_id, task_type):
+ """Performs inference on a dataset.
+
+ This function loads an HDF5 dataset, performs batch inference using the model,
+ saves the results to an HDF5 file, and updates the task status.
+
+ Args:
+ dataset_path (str): Path to the HDF5 dataset file.
+ task_id (str): Unique identifier for the inference task.
+ task_type (int): Type of inference task to perform.
+
+ Raises:
+ Exception: If there is an error during the inference process.
+ """
+ try:
+ tasks_status[task_id] = TaskStatus.PROCESSING
+
+ batch_inputs = Utilities.load_h5_file(dataset_path)
+
+ results_list = model.batch_infer(batch_inputs, task_type)
+
+ os.makedirs(DeployConfig.results_dir, exist_ok=True)
+ results_path = os.path.join(DeployConfig.results_dir, f"{task_id}_results.h5")
+
+ Utilities.save_h5_file(results_list, results_path)
+
+ tasks_status[task_id] = TaskStatus.COMPLETED
+ except Exception as e:
+ tasks_status[task_id] = TaskStatus.ERROR
+ logger.error(f"Task {task_id} infer failed, ERROR: {e}.")
+
+
+@app.post("/mindscience/deploy/load_model")
+async def load_model(model_name: str = Form(), model_file: Union[UploadFile, str] = None):
+ """Loads a model for inference.
+
+ This endpoint handles the loading of a model, either from an uploaded file
+ or from a local file. If a model is already loaded, it returns an error.
+
+ Args:
+ model_name (str): Name of the model to load.
+ model_file (Union[UploadFile, str], optional): Uploaded model file. Defaults to None.
+
+ Returns:
+ JSONResponse: A response containing model information and status.
+ """
+ model_info = ModelInfo()
+ if model.is_ready() == HealthStatus.READY:
+ model_info.status = ModelStatus.FAILURE
+ model_info.message = f"Model {model.session_name} is already loaded, please unload first."
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=403,
+ headers={"Content-Type": "application/json"}
+ )
+
+ if not model_file:
+ logger.info("File not uploaded, model loaded from local file.")
+ else:
+ try:
+ logger.info(f"Start receive {model_name} file")
+ os.makedirs(DeployConfig.models_dir, exist_ok=True)
+ save_dir = os.path.join(DeployConfig.models_dir, model_name)
+ os.makedirs(save_dir, exist_ok=True)
+
+ save_path = os.path.join(save_dir, model_file.filename)
+ await Utilities.save_upload_file(model_file, save_path)
+
+ Utilities.extract_file(save_path)
+ logger.info(f"{model_name} file receive successfully.")
+ except Exception as e:
+ model_info.status = ModelStatus.FAILURE
+ model_info.message = str(e)
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=500,
+ headers={"Content-Type": "application/json"}
+ )
+
+ try:
+ model.init_session(model_name, device_num=DeployConfig.deploy_device_num)
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=201,
+ headers={"Content-Type": "application/json"}
+ )
+ except Exception as e:
+ model_info.status = ModelStatus.FAILURE
+ model_info.message = str(e)
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=500,
+ headers={"Content-Type": "application/json"}
+ )
+
+
+@app.post("/mindscience/deploy/unload_model")
+async def unload_model():
+ """Unloads the currently loaded model.
+
+ This endpoint unloads the model session and frees up the associated resources.
+
+ Returns:
+ JSONResponse: A response containing model information and status.
+ """
+ model_info = ModelInfo()
+ try:
+ model.del_session()
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=200,
+ headers={"Content-Type": "application/json"}
+ )
+ except Exception as e:
+ model_info.status = ModelStatus.FAILURE
+ model_info.message = str(e)
+ return JSONResponse(
+ content=model_info.model_dump(),
+ status_code=500,
+ headers={"Content-Type": "application/json"}
+ )
+
+
+@app.post("/mindscience/deploy/infer")
+async def infer(dataset: UploadFile = File(..., description="input dataset"), task_type: int = Form(),
+ background_tasks: BackgroundTasks = Any):
+ """Performs inference on the uploaded dataset.
+
+ This endpoint accepts a dataset file and performs inference in the background.
+ It checks if the model is ready, validates the number of pending tasks,
+ saves the uploaded dataset, and starts the inference task.
+
+ Args:
+ dataset (UploadFile): The input dataset file to process.
+ task_type (int): Type of inference task to perform.
+ background_tasks (BackgroundTasks): FastAPI background tasks manager.
+
+ Returns:
+ JSONResponse: A response containing the task ID.
+
+ Raises:
+ HTTPException: If the server is not ready or if the request limit is exceeded.
+ """
+ if model.is_ready() != HealthStatus.READY:
+ raise HTTPException(status_code=500, detail="Server is not ready, please check")
+
+ pending_number = Utilities.count_pending_task(tasks_status)
+ logger.info(f"Pending task number is {pending_number}")
+ if pending_number >= DeployConfig.max_request_num:
+ logger.error(f"Predict request number exceed limited number: \
+ {pending_number} vs {DeployConfig.max_request_num}")
+ raise HTTPException(status_code=503, detail="Request number exceed limited number, please wait for a while.")
+
+ task_id = str(uuid.uuid4())
+ tasks_status[task_id] = TaskStatus.PENDING
+
+ try:
+ os.makedirs(DeployConfig.datasets_dir, exist_ok=True)
+ dataset_path = os.path.join(DeployConfig.datasets_dir, f"{task_id}_{dataset.filename}")
+ await Utilities.save_upload_file(dataset, dataset_path)
+
+ background_tasks.add_task(inference, dataset_path, task_id, task_type)
+
+ return JSONResponse(
+ content={"task_id": task_id},
+ status_code=200,
+ headers={"Content-Type": "application/json"}
+ )
+ except Exception as e:
+ tasks_status[task_id] = TaskStatus.ERROR
+ raise HTTPException(status_code=500, detail=f"Task {task_id} infer failed, ERROR: {e}.") from e
+
+
+@app.get("/mindscience/deploy/query_status/{task_id}")
+async def query_status(task_id: str):
+ """Queries the status of an inference task.
+
+ This endpoint retrieves the current status of a specific inference task
+ by its task ID.
+
+ Args:
+ task_id (str): Unique identifier of the task to query.
+
+ Returns:
+ JSONResponse: A response containing the task status.
+
+ Raises:
+ HTTPException: If the task ID is not found.
+ """
+ status = tasks_status.get(task_id)
+ if status is None:
+ raise HTTPException(status_code=404, detail=f"Task {task_id} is not found, please check!")
+ return JSONResponse(
+ content={"status": status},
+ status_code=200,
+ headers={"Content-Type": "application/json"}
+ )
+
+
+@app.get("/mindscience/deploy/query_results/{task_id}")
+async def query_results(task_id: str):
+ """Retrieves the results of a completed inference task.
+
+ This endpoint returns the results file of a completed inference task
+ if the task is completed, otherwise returns the current status.
+
+ Args:
+ task_id (str): Unique identifier of the task to query.
+
+ Returns:
+ FileResponse or JSONResponse: Either the results file or a response containing the task status and message.
+
+ Raises:
+ HTTPException: If the task ID is not found.
+ """
+ status = tasks_status.get(task_id)
+ if status is None:
+ raise HTTPException(status_code=404, detail=f"Task {task_id} is not found, please check!")
+ if status != TaskStatus.COMPLETED:
+ return JSONResponse(
+ content={"status": status, "message": f"Task {task_id} is not completed."},
+ status_code=404,
+ headers={"Content-Type": "application/json"}
+ )
+ result_path = os.path.join(DeployConfig.results_dir, f"{task_id}_results.h5")
+ return FileResponse(result_path, filename=f"{task_id}_results.h5")
+
+
+@app.get("/mindscience/deploy/health_check")
+async def health_check():
+ """Performs a health check on the deployment service.
+
+ This endpoint checks the readiness status of the model and returns
+ an appropriate HTTP response based on the model's health status.
+
+ Returns:
+ JSONResponse: A response indicating the health status of the service.
+ """
+ health_status = model.is_ready()
+ logger.info(f"health check result is {health_status}.")
+ if health_status == HealthStatus.NOTLOADED:
+ return JSONResponse(
+ content={},
+ status_code=501,
+ headers={"Content-Type": "application/json"}
+ )
+ if health_status == HealthStatus.EXCEPTION:
+ return JSONResponse(
+ content={model.session_name: health_status},
+ status_code=503,
+ headers={"Content-Type": "application/json"}
+ )
+ return JSONResponse(
+ content={model.session_name: health_status},
+ status_code=200,
+ headers={"Content-Type": "application/json"}
+ )
+
+
+if __name__ == "__main__":
+ server = Server(
+ Config(
+ app=app,
+ host=ServerConfig.host,
+ port=ServerConfig.deploy_port,
+ limit_concurrency=ServerConfig.limit_concurrency,
+ timeout_keep_alive=ServerConfig.timeout_keep_alive,
+ backlog=ServerConfig.backlog
+ )
+ )
+
+ def terminate_signal_handler(signum, frame):
+ """Handles termination signals to gracefully shut down the server.
+
+ This function is called when the server receives a termination signal
+ (SIGTERM or SIGINT). It cleans up the model session and sets the server
+ to exit.
+
+ Args:
+ signum (int): The signal number received.
+ frame: The current stack frame (unused).
+ """
+ global model
+ logger.info(f"Catch signal: {signum}, starting terminate server...")
+ try:
+ model.del_session()
+ except Exception as e:
+ logger.exception(f"Model clear failed, please check! ERROR: {e}")
+ del model
+ server.should_exit = True
+
+ signal.signal(signal.SIGTERM, terminate_signal_handler)
+ signal.signal(signal.SIGINT, terminate_signal_handler)
+
+ logger.info("Starting deploy server...")
+ server.run()
diff --git a/MindEnergy/applications/deploy/docs/deploy_arch.png b/MindEnergy/applications/deploy/docs/deploy_arch.png
new file mode 100755
index 0000000000000000000000000000000000000000..9394afa58298f0b91184b26a8a84fc7cc37e6fa8
Binary files /dev/null and b/MindEnergy/applications/deploy/docs/deploy_arch.png differ
diff --git a/MindEnergy/applications/deploy/docs/deploy_arch_en.png b/MindEnergy/applications/deploy/docs/deploy_arch_en.png
new file mode 100755
index 0000000000000000000000000000000000000000..e8a040868ba88c9fe67b292c372851a5e3b6e30f
Binary files /dev/null and b/MindEnergy/applications/deploy/docs/deploy_arch_en.png differ
diff --git a/MindEnergy/applications/deploy/monitor.py b/MindEnergy/applications/deploy/monitor.py
new file mode 100644
index 0000000000000000000000000000000000000000..8939fe3049dcd7afeb1769bf9948c3a62f5d6127
--- /dev/null
+++ b/MindEnergy/applications/deploy/monitor.py
@@ -0,0 +1,157 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+MindScience Monitoring Service.
+
+This module implements a server resource monitoring service based on FastAPI that supports:
+- Real-time monitoring of CPU usage
+- Real-time monitoring of memory usage
+- Real-time monitoring of NPU (Neural Processing Unit) usage
+- Real-time monitoring of NPU memory usage
+- Health check endpoint for server resource usage
+
+The service provides a RESTful API endpoint to retrieve current system resource statistics,
+which can be used for system health monitoring and performance analysis.
+"""
+
+import signal
+import asyncio
+from contextlib import asynccontextmanager
+
+import psutil
+from loguru import logger
+from fastapi import FastAPI
+from uvicorn import Server
+from uvicorn.config import Config
+
+from src.utils import Utilities
+from src.config import configure_logging, ServerConfig
+
+# pylint: disable=unused-argument, redefined-outer-name
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manages the application lifespan by initializing and cleaning up resources.
+
+ This function is used as an async context manager for the FastAPI application
+ lifespan. It handles logging configuration and performs startup and shutdown
+ logging.
+
+ Args:
+ app (FastAPI): The FastAPI application instance.
+ """
+ configure_logging("monitor")
+ logger.info("Starting monitor service.")
+ yield
+ logger.info("Shutting down monitor service.")
+
+app = FastAPI(lifespan=lifespan)
+
+@app.get("/mindscience/monitor/resource_usage")
+async def get_server_status():
+ """Retrieves the current server resource usage statistics.
+
+ This endpoint collects and returns CPU, memory, and NPU usage statistics
+ from the server. It uses psutil for CPU and memory metrics and a custom
+ utility function for NPU metrics.
+
+ Returns:
+ dict: A dictionary containing the status, message, and resource usage data
+ including CPU usage rate, memory usage rate, NPU usage rate, and NPU memory usage rate.
+ """
+ try:
+ results = await asyncio.gather(
+ asyncio.to_thread(psutil.cpu_percent, interval=None),
+ asyncio.to_thread(lambda: psutil.virtual_memory().percent),
+ Utilities.get_npu_usage(),
+ return_exceptions=True
+ )
+
+ # 解包结果
+ cpu_usage, memory_usage, npu_stats = results
+
+ if isinstance(cpu_usage, Exception):
+ logger.error(f"Failed to get CPU usage: {cpu_usage}")
+ cpu_usage = 0.0
+
+ if isinstance(memory_usage, Exception):
+ logger.error(f"Failed to get memory usage: {memory_usage}")
+ memory_usage = 0.0
+
+ if isinstance(npu_stats, Exception):
+ logger.warning(f"Failed to get NPU usage info: {npu_stats}")
+ npu_usage_rate = "0.00"
+ npu_memory_usage_rate = "0.00"
+ elif npu_stats:
+ total_memory = sum(stat["memory_total_mb"] for stat in npu_stats)
+ used_memory = sum(stat["memory_used_mb"] for stat in npu_stats)
+ avg_utilization = sum(stat["utilization_percent"] for stat in npu_stats) / len(npu_stats)
+ avg_memory_usage = (used_memory / total_memory * 100) if total_memory > 0 else 0
+ npu_usage_rate = f"{avg_utilization:.2f}"
+ npu_memory_usage_rate = f"{avg_memory_usage:.2f}"
+ else:
+ npu_usage_rate = "0.00"
+ npu_memory_usage_rate = "0.00"
+
+ return {
+ "status": 200,
+ "msg": "success",
+ "data": {
+ "cpuUsageRate": f"{cpu_usage:.2f}",
+ "memoryUsageRate": f"{memory_usage:.2f}",
+ "npuUsageRate": npu_usage_rate,
+ "npuMemoryUsageRate": npu_memory_usage_rate
+ }
+ }
+ except Exception as e:
+ logger.error(f"An unexpected error occurred in get_server_status: {e}")
+ return {
+ "status": 500,
+ "msg": "failure",
+ "data": {}
+ }
+
+if __name__ == "__main__":
+ server = Server(
+ Config(
+ app=app,
+ host=ServerConfig.host,
+ port=ServerConfig.monitor_port,
+ limit_concurrency=ServerConfig.limit_concurrency,
+ timeout_keep_alive=ServerConfig.timeout_keep_alive,
+ backlog=ServerConfig.backlog
+ )
+ )
+
+ def terminate_signal_handler(signum, frame):
+ """Handles termination signals to gracefully shut down the server.
+
+ This function is called when the server receives a termination signal
+ (SIGTERM or SIGINT). It sets the server to exit.
+
+ Args:
+ signum (int): The signal number received.
+ frame: The current stack frame (unused).
+ """
+ logger.info(f"Catch signal: {signum}, starting terminate server...")
+ server.should_exit = True
+
+ signal.signal(signal.SIGTERM, terminate_signal_handler)
+ signal.signal(signal.SIGINT, terminate_signal_handler)
+
+
+ logger.info("Starting monitor server...")
+ server.run()
diff --git a/MindEnergy/applications/deploy/requirements.txt b/MindEnergy/applications/deploy/requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..efb672f8223e690b0ae022a96605b86b87789078
--- /dev/null
+++ b/MindEnergy/applications/deploy/requirements.txt
@@ -0,0 +1,8 @@
+fastapi
+uvicorn
+python-multipart
+h5py
+loguru
+aiofiles
+psutil
+numpy
diff --git a/MindEnergy/applications/deploy/src/__init__.py b/MindEnergy/applications/deploy/src/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..c1d6e2e61bc2e8689b21fa5f96feb9371325f852
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/__init__.py
@@ -0,0 +1,31 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+MindScience deployment service source package.
+
+This package contains modules for deploying and monitoring machine learning models
+using FastAPI services. The modules include:
+
+- config: Configuration classes and functions for deployment settings
+- enums: Enumerations for status values and message types
+- inference: Classes for performing inference using MindSpore Lite
+- schemas: Data models for request/response validation
+- session: Session management for model inference
+- utils: Utility functions for file handling, model collection, and system monitoring
+
+This package provides the core functionality for model deployment, inference execution,
+and system monitoring in the MindScience platform.
+"""
diff --git a/MindEnergy/applications/deploy/src/config.py b/MindEnergy/applications/deploy/src/config.py
new file mode 100644
index 0000000000000000000000000000000000000000..5704c76044fb13952831bb4645e8eaff8eadcc32
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/config.py
@@ -0,0 +1,119 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Configuration module for MindScience deployment service.
+
+This module defines configuration classes and functions used throughout the
+MindScience deployment and monitoring services. It includes:
+
+- ModelConfig: Specifies model input and output column names
+- DeployConfig: Contains deployment-related settings such as device numbers, request limits, and file paths
+- ServerConfig: Defines server settings including host, ports, and connection parameters
+- configure_logging: Function to set up logging for different services
+
+The configurations use dataclasses with frozen=True to ensure immutability
+and thread-safe access to configuration values.
+"""
+
+import sys
+from typing import Tuple
+from dataclasses import dataclass
+
+from loguru import logger
+
+@dataclass(frozen=True)
+class ModelConfig:
+ """Configuration for model input and output specifications.
+
+ Attributes:
+ input_columns: Tuple of column names used as model inputs.
+ output_columns: Tuple of column names produced as model outputs.
+ """
+ input_columns: Tuple[str] = ("x", "edge_index", "edge_attr")
+
+ output_columns: Tuple[str] = ("output",)
+
+
+@dataclass(frozen=True)
+class DeployConfig:
+ """Configuration for deployment settings.
+
+ Attributes:
+ max_device_num: Maximum number of devices allowed for deployment.
+ deploy_device_num: Number of devices to be used for deployment.
+ max_request_num: Maximum number of concurrent requests allowed.
+ models_dir: Directory path for storing model files.
+ datasets_dir: Directory path for storing dataset files.
+ results_dir: Directory path for storing result files.
+ dummy_model_path: File path for the dummy model used in testing.
+ chunk_size: Size of data chunks for processing in bytes.
+ """
+ max_device_num: int = 8
+
+ deploy_device_num: int = 8
+
+ max_request_num: int = 100
+
+ models_dir: str = "models"
+
+ datasets_dir: str = "datasets"
+
+ results_dir: str = "results"
+
+ dummy_model_path: str = "dummy_model.mindir"
+
+ chunk_size: int = 8 * 1024 * 1024
+
+
+@dataclass(frozen=True)
+class ServerConfig:
+ """Configuration for server settings.
+
+ Attributes:
+ host: Host address for the server.
+ deploy_port: Port number for deployment service.
+ monitor_port: Port number for monitoring service.
+ limit_concurrency: Maximum number of concurrent connections allowed.
+ timeout_keep_alive: Timeout duration for keep-alive connections in seconds.
+ backlog: Maximum number of pending connections in the queue.
+ """
+ host: str = "127.0.0.1"
+
+ deploy_port: int = 8001
+
+ monitor_port: int = 8002
+
+ limit_concurrency: int = 1000
+
+ timeout_keep_alive: int = 30
+
+ backlog: int = 2048
+
+
+def configure_logging(service: str):
+ """Configures logging settings for the specified service.
+
+ Args:
+ service: Name of the service to configure logging for.
+ """
+ logger.add(
+ f"logs/{service}.log",
+ rotation="100 MB",
+ retention="10 days",
+ format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
+ enqueue=True
+ )
+ logger.add(sys.stderr, level="DEBUG")
diff --git a/MindEnergy/applications/deploy/src/enums.py b/MindEnergy/applications/deploy/src/enums.py
new file mode 100644
index 0000000000000000000000000000000000000000..687760479f8c54d92553e1f95835af1d6cd53c4f
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/enums.py
@@ -0,0 +1,61 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Enumeration module for MindScience deployment service.
+
+This module defines various enumerations used throughout the
+MindScience deployment system, including:
+
+- ProcessMessage: Message types for inter-process communication
+- HealthStatus: Health status indicators for services
+- ModelStatus: Status indicators for model loading and execution
+- TaskStatus: Execution status indicators for inference tasks
+
+These enums provide type-safe constants for status values and message types
+used in the system's processes and communication.
+"""
+
+from enum import Enum, IntEnum
+
+class ProcessMessage(IntEnum):
+ """Enumeration for process message types in the system."""
+ ERROR = 0
+ PREDICT = 1
+ REPLY = 2
+ CHECK = 3
+ BUILD_FINISH = 4
+ EXIT = 5
+
+
+class HealthStatus(str, Enum):
+ """Enumeration for health status of services."""
+ READY = "ready"
+ NOTLOADED = "not_loaded"
+ EXCEPTION = "exception"
+
+
+class ModelStatus(str, Enum):
+ """Enumeration for model loading or execution status."""
+ SUCCESS = "success"
+ FAILURE = "failure"
+
+
+class TaskStatus(str, Enum):
+ """Enumeration for task execution status."""
+ PENDING = "pending"
+ PROCESSING = "processing"
+ COMPLETED = "completed"
+ ERROR = "error"
diff --git a/MindEnergy/applications/deploy/src/inference.py b/MindEnergy/applications/deploy/src/inference.py
new file mode 100644
index 0000000000000000000000000000000000000000..3dd5b12a095315f03350a9dceb17c1ccb20b246f
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/inference.py
@@ -0,0 +1,419 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Inference module for MindScience deployment service.
+
+This module provides classes and functions for performing inference using
+MindSpore Lite across multiple devices (e.g. NPU devices) in parallel.
+It includes:
+
+- InferenceModel: A wrapper for MindSpore Lite models with input/output handling
+- MultiprocessInference: A class for managing parallel inference across multiple devices
+- infer_process_func: A function that runs in separate processes to handle inference tasks
+
+The module implements a multiprocessing approach where each device runs in its own
+process with communication handled via pipes. It provides methods for model building,
+inference execution, health checks, and resource cleanup.
+"""
+
+import os
+from typing import List
+from multiprocessing import Process, Pipe
+
+import numpy as np
+from loguru import logger
+
+import mindspore_lite as mslite
+
+from .config import DeployConfig
+from .enums import ProcessMessage
+
+def infer_process_func(pipe_child_end, model_path, device_id):
+ """Function that runs in a separate process to handle inference tasks.
+
+ This function initializes an InferenceModel, handles communication through
+ a pipe, and processes different types of messages including initialization,
+ inference requests, health checks, and exit signals. It runs in a continuous
+ loop until it receives an exit message.
+
+ Args:
+ pipe_child_end: The child end of a multiprocessing.Pipe used for
+ communication with the parent process.
+ model_path (str): Path to the model file to load for inference.
+ device_id (int): ID of the device to run inference on.
+ """
+ try:
+ infer_model = InferenceModel(model_path, device_id)
+ if DeployConfig.dummy_model_path.endswith(".mindir") and os.path.exists(DeployConfig.dummy_model_path):
+ dummy_model = InferenceModel(DeployConfig.dummy_model_path, device_id)
+ else:
+ dummy_model = infer_model
+ pipe_child_end.send((ProcessMessage.BUILD_FINISH, infer_model.batch_size))
+ except Exception as e:
+ logger.exception(f"Model initialization failed on device {device_id}: {e}")
+ pipe_child_end.send((ProcessMessage.ERROR, str(e)))
+ pipe_child_end.close()
+ return
+
+ try:
+ while True:
+ msg_type, msg = pipe_child_end.recv()
+ if msg_type == ProcessMessage.EXIT:
+ logger.info(f"Device {device_id} received exit signal")
+ break
+ if msg_type == ProcessMessage.CHECK:
+ try:
+ _ = dummy_model.dummy_infer()
+ pipe_child_end.send((ProcessMessage.REPLY, ""))
+ except Exception as e:
+ err_msg = f"Dummy inference failed on device {device_id}: {e}"
+ logger.exception(err_msg)
+ pipe_child_end.send((ProcessMessage.ERROR, err_msg))
+ raise RuntimeError(err_msg) from e
+ elif msg_type == ProcessMessage.PREDICT:
+ try:
+ inputs = msg
+ result = infer_model.infer(inputs)
+ pipe_child_end.send((ProcessMessage.REPLY, result))
+ except Exception as e:
+ err_msg = f"Inference failed on device {device_id}: {e}"
+ logger.exception(err_msg)
+ pipe_child_end.send((ProcessMessage.ERROR, err_msg))
+ raise RuntimeError(err_msg) from e
+ else:
+ err_msg = f"Unexpected message type {msg_type} on device {device_id}"
+ logger.exception(err_msg)
+ pipe_child_end.send((ProcessMessage.ERROR, err_msg))
+ raise RuntimeError(err_msg)
+ except Exception as e:
+ logger.exception(f"Runtime error on device {device_id}: {e}")
+ pipe_child_end.send((ProcessMessage.ERROR, str(e)))
+ finally:
+ pipe_child_end.close()
+
+
+class MultiprocessInference:
+ """A class for performing inference using multiple processes across different devices.
+
+ This class manages multiple inference processes that run in parallel across specified
+ devices (e.g. Ascend NPU devices). It provides methods to build the model, run
+ inference, perform health checks, and clean up resources.
+
+ Attributes:
+ model_path (str): Path to the model file to be used for inference.
+ device_num (int): Number of devices to use for parallel inference.
+ batch_size (int): Batch size of the loaded model, initialized to -1.
+ process_pool (list): List of Process objects for the inference processes.
+ parent_pipes (list): List of parent ends of Pipe objects for communication
+ with the inference processes.
+ """
+
+ def __init__(self, model_path: str, device_num: int):
+ """Initializes the MultiprocessInference instance.
+
+ Args:
+ model_path (str): Path to the model file to be used for inference.
+ device_num (int): Number of devices to use for parallel inference.
+ """
+ self.model_path = model_path
+ self.device_num = device_num
+ self.batch_size = -1
+
+ self.process_pool = []
+ self.parent_pipes = []
+
+ def build_model(self):
+ """Builds and initializes the model on multiple devices.
+
+ This method creates a process for each device, initializes the model in each process,
+ and establishes communication pipes between the main process and the worker processes.
+ It also verifies successful initialization of each device and ensures batch size
+ consistency across devices.
+
+ Raises:
+ RuntimeError: If model has already been loaded or if initialization fails.
+ """
+ if self.process_pool:
+ self._cleanup_resources()
+ raise RuntimeError("The model has been loaded. \
+ Please uninstall this model first and then reload another model!")
+
+ for device_id in range(self.device_num):
+ parent_conn, child_conn = Pipe(duplex=True)
+ self.parent_pipes.append(parent_conn)
+
+ process = Process(
+ target=infer_process_func,
+ args=(child_conn, self.model_path, device_id),
+ daemon=True,
+ )
+ self.process_pool.append(process)
+ process.start()
+
+ child_conn.close()
+
+ for device_id in range(self.device_num):
+ try:
+ msg_type, msg = self.parent_pipes[device_id].recv()
+ if msg_type == ProcessMessage.ERROR:
+ raise RuntimeError(f"Device {device_id} initialization failed: {msg}")
+ if msg_type != ProcessMessage.BUILD_FINISH:
+ raise RuntimeError(f"Unexpected message type {msg_type} during initialization")
+
+ if self.batch_size not in (-1, msg):
+ raise RuntimeError("Batch size in different models are inconsistent.")
+ self.batch_size = msg
+ logger.info(f"Device {device_id} initialized successfully")
+ except Exception as e:
+ self._cleanup_resources()
+ logger.critical("Failed to initialize inference processes")
+ raise RuntimeError(f"Unexpected error {e} during initialization") from e
+
+ def infer(self, inputs):
+ """Performs inference on the provided input data.
+
+ This method distributes the input data across available devices and collects
+ the results. Each input item is sent to a separate device for parallel processing.
+
+ Args:
+ inputs: List of input data for inference. Each item will be sent to a separate device.
+
+ Returns:
+ list: List of inference results from each device.
+
+ Raises:
+ ValueError: If the number of inputs exceeds the number of available devices.
+ RuntimeError: If sending prediction command to a device fails or if inference fails.
+ """
+ if self.device_num < len(inputs):
+ raise ValueError(f"Inputs length {len(inputs)} should be less than or equal to \
+ parallel number {self.device_num}, inference abort!")
+
+ for device_id, input_data in enumerate(inputs):
+ try:
+ self.parent_pipes[device_id].send((ProcessMessage.PREDICT, input_data))
+ except (BrokenPipeError, EOFError, OSError) as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Failed to send PREDICT to device {device_id}: {e}") from e
+ except Exception as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Unexpected error when sending PREDICT to device {device_id}: {e}") from e
+
+ results = []
+ for device_id in range(len(inputs)):
+ try:
+ msg_type, msg = self.parent_pipes[device_id].recv()
+ if msg_type == ProcessMessage.ERROR:
+ self._cleanup_resources()
+ raise RuntimeError(f"Device {device_id} inference failed: {msg}, cleanup all resource!")
+ results.append(msg)
+ except EOFError as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Device {device_id} connection closed unexpectedly.") from e
+
+ return results
+
+ def finalize(self):
+ """Finalizes the inference processes and cleans up resources.
+
+ This method sends an EXIT signal to all worker processes, waits for them to
+ finish, and then cleans up all resources including the processes and pipes.
+ """
+ for idx, pipe in enumerate(self.parent_pipes):
+ try:
+ pipe.send((ProcessMessage.EXIT, ""))
+ except (BrokenPipeError, EOFError, OSError) as e:
+ logger.exception(f"Failed to send EXIT to device {idx}: {e}")
+ except Exception as e:
+ logger.exception(f"Unexpected error when sending EXIT to device {idx}: {e}")
+
+ for idx, process in enumerate(self.process_pool):
+ if process.is_alive():
+ try:
+ process.join(timeout=5)
+ except (OSError, RuntimeError) as e:
+ logger.exception(f"Error while joining process {idx} during finalize: {e}")
+ except Exception as e:
+ logger.exception(f"Unexpected error while joining process {idx} during finalize: {e}")
+
+ self._cleanup_resources()
+
+ def _cleanup_resources(self) -> None:
+ """Cleans up all process and pipe resources.
+
+ This private method terminates all running processes and closes all pipes.
+ It tracks any failures during cleanup and raises a RuntimeError if cleanup
+ fails for any of the resources.
+
+ Raises:
+ RuntimeError: If any process or pipe fails to be cleaned up properly.
+ """
+ failure_processes = []
+ for idx, process in enumerate(self.process_pool):
+ if process.is_alive():
+ logger.warning(f"Terminating process {idx}...")
+ try:
+ process.terminate()
+ process.join(timeout=2)
+ except (OSError, RuntimeError) as e:
+ logger.exception(f"Failed to terminate process {idx}: {e}")
+ failure_processes.append(idx)
+ except Exception as e:
+ logger.exception(f"Unexpected error while terminating process {idx} during cleanup resources: {e}")
+ failure_processes.append(idx)
+
+ failure_pipes = []
+ for idx, pipe in enumerate(self.parent_pipes):
+ try:
+ pipe.close()
+ except (BrokenPipeError, EOFError, OSError) as e:
+ logger.exception(f"Failed to close {idx} parent pipe cleanly: {e}")
+ failure_pipes.append(idx)
+ except Exception as e:
+ logger.exception(f"Unexpected error while closing parent pipe {idx}: {e}")
+ failure_pipes.append(idx)
+
+ self.process_pool = []
+ self.parent_pipes = []
+
+ if failure_processes or failure_pipes:
+ raise RuntimeError(f"Failed to cleanup subprocesses: {failure_processes}, parent pipes: {failure_pipes}!")
+
+ def is_ready(self):
+ """Checks if all inference processes are ready and healthy.
+
+ This method verifies that all processes are alive and responsive by sending
+ a CHECK message to each and waiting for a response. If any process is not
+ responsive or returns an error, it raises a RuntimeError and cleans up resources.
+
+ Raises:
+ RuntimeError: If model is not loaded, if any process is not alive,
+ or if health check fails on any device.
+ """
+ if not self.process_pool:
+ raise RuntimeError("Model not loaded, please check!")
+
+ for idx, process in enumerate(self.process_pool):
+ if not process.is_alive():
+ self._cleanup_resources()
+ raise RuntimeError(f"Process {idx} is not alive, please check")
+
+ for idx, pipe in enumerate(self.parent_pipes):
+ try:
+ pipe.send((ProcessMessage.CHECK, ""))
+ except (BrokenPipeError, EOFError, OSError) as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Failed to send CHECK to device {idx}: {e}") from e
+ except Exception as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Unexpected error when sending CHECK to device {idx}: {e}") from e
+
+ for idx, pipe in enumerate(self.parent_pipes):
+ try:
+ msg_type, msg = pipe.recv()
+ if msg_type == ProcessMessage.ERROR:
+ self._cleanup_resources()
+ raise RuntimeError(f"Device {idx} health check failed: {msg}, cleanup all resource!")
+ except EOFError as e:
+ self._cleanup_resources()
+ raise RuntimeError(f"Device {idx} connection closed unexpectedly during health check.") from e
+
+
+class InferenceModel:
+ """A class for performing inference using MindSpore Lite.
+
+ This class initializes a MindSpore Lite model on a specific device and provides
+ methods for running inference, dummy inference, and accessing model properties.
+
+ Attributes:
+ model: MindSpore Lite Model instance.
+ input_shape_list (list): List of input shapes for the model.
+ model_batch_size (int): Batch size of the model, extracted from the first input shape.
+ """
+
+ def __init__(self, model_path: str, device_id: int):
+ """Initializes the InferenceModel instance.
+
+ Creates a MindSpore Lite context for the specified device, loads the model
+ from the given path, and extracts input shapes and batch size information.
+
+ Args:
+ model_path (str): Path to the MindIR model file.
+ device_id (int): ID of the device to run inference on (e.g. Ascend NPU ID).
+
+ Raises:
+ RuntimeError: If the loaded model has no input.
+ """
+ context = mslite.Context()
+ context.target = ["ascend"]
+ context.ascend.device_id = device_id
+
+ self.model = mslite.Model()
+ self.model.build_from_file(model_path, mslite.ModelType.MINDIR, context)
+ self.input_shape_list = [item.shape for item in self.model.get_inputs()]
+ if not self.input_shape_list:
+ raise RuntimeError("The loaded model has no input!")
+ self.model_batch_size = self.input_shape_list[0][0]
+
+ def infer(self, batch_inputs: List[np.ndarray]):
+ """Performs inference on the provided batch of input data.
+
+ This method takes a list of numpy arrays as input, assigns them to the model's
+ input tensors, runs the prediction, and returns the output as a list of numpy arrays.
+
+ Args:
+ batch_inputs (List[np.ndarray]): List of input data as numpy arrays. The number
+ of inputs should match the number of model inputs.
+
+ Returns:
+ list: List of inference results as numpy arrays.
+
+ Raises:
+ ValueError: If the number of inputs doesn't match the number of model inputs.
+ """
+ inputs = self.model.get_inputs()
+ if len(batch_inputs) != len(inputs):
+ raise ValueError(f"The number of model inputs {len(inputs)} should be the same as \
+ the number of inputs {len(batch_inputs)}")
+
+ for i, input_ in enumerate(inputs):
+ input_.set_data_from_numpy(batch_inputs[i])
+
+ batch_out = self.model.predict(inputs)
+ return [item.get_data_to_numpy() for item in batch_out]
+
+ def dummy_infer(self):
+ """Performs a dummy inference without input data.
+
+ This method runs the model prediction without providing specific input data,
+ which is typically used for model warm-up or health checks.
+
+ Returns:
+ list: Model outputs from the prediction.
+ """
+ inputs = self.model.get_inputs()
+ outputs = self.model.predict(inputs)
+ return outputs
+
+ @property
+ def batch_size(self):
+ """int: The batch size of the model, extracted from the first input shape."""
+ return self.model_batch_size
+
+ @property
+ def input_shapes(self):
+ """list: List of input shapes for the model."""
+ return self.input_shape_list
diff --git a/MindEnergy/applications/deploy/src/schemas.py b/MindEnergy/applications/deploy/src/schemas.py
new file mode 100644
index 0000000000000000000000000000000000000000..3bc2a0e0ed286ef7b8ec3c7514a8057210828f28
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/schemas.py
@@ -0,0 +1,42 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Schemas module for MindScience deployment service.
+
+This module defines Pydantic data models (schemas) used for request/response
+validation and data serialization in the MindScience deployment service.
+Currently includes:
+
+- ModelInfo: A schema for representing model status and associated messages
+ returned by the deployment service APIs
+
+These schemas ensure type safety and proper validation of data exchanged
+between clients and the deployment service.
+"""
+
+from pydantic import BaseModel
+
+from .enums import ModelStatus
+
+class ModelInfo(BaseModel):
+ """Model information containing status and message.
+
+ Attributes:
+ status: The status of the model, defaults to ModelStatus.SUCCESS.
+ message: The message associated with the model status, defaults to empty string.
+ """
+ status: str = ModelStatus.SUCCESS
+ message: str = ""
diff --git a/MindEnergy/applications/deploy/src/session.py b/MindEnergy/applications/deploy/src/session.py
new file mode 100644
index 0000000000000000000000000000000000000000..db7316e1776078b6464d57ba425a445790897751
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/session.py
@@ -0,0 +1,219 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Session management module for MindScience deployment service.
+
+This module provides the SessionManager class for managing model inference sessions.
+It includes functionality for:
+
+- Loading and initializing models for inference
+- Managing multiple inference sessions across different devices
+- Performing batch inference operations
+- Checking model health status
+- Unloading models and cleaning up resources
+
+The SessionManager handles the lifecycle of model sessions, from initialization
+to finalization, ensuring proper resource management and inference execution.
+"""
+
+import numpy as np
+from loguru import logger
+
+from .utils import Utilities
+from .enums import HealthStatus
+from .config import DeployConfig
+from .inference import MultiprocessInference
+
+class SessionManager:
+ """Manages model sessions for inference tasks.
+
+ This class handles the initialization, deletion, and inference operations
+ for machine learning models, including loading models, managing sessions,
+ and performing batch inference.
+ """
+
+ def __init__(self):
+ """Initializes the SessionManager with default values."""
+ self.current_model = ""
+ self.device_num = -1
+ self.infer_sessions = None
+ self._model_dict = {}
+
+ def init_session(self, model_name, device_num):
+ """Initializes a model session for inference.
+
+ Collects model files, validates device number and model name,
+ creates inference sessions, and builds the model for each session.
+
+ Args:
+ model_name (str): Name of the model to load.
+ device_num (int): Number of devices to use for inference.
+
+ Raises:
+ ValueError: If device number exceeds maximum, model is not found,
+ or model is already loaded.
+ RuntimeError: If session initialization fails.
+ """
+ self._model_dict = Utilities.collect_mindir_models(DeployConfig.models_dir)
+
+ if device_num > DeployConfig.max_device_num:
+ raise ValueError(f"Device num {device_num} is over the actual device num {DeployConfig.max_device_num}, \
+ please check!")
+ if model_name not in self.model_dict:
+ raise ValueError(f"model {model_name} is not in {self.model_dict.keys()}, please check!")
+ if model_name == self.current_model:
+ raise ValueError(f"model {model_name} is already loaded, please check!")
+
+ model_paths = self.model_dict.get(model_name)
+ sessions = [MultiprocessInference(model_path, device_num) for model_path in model_paths]
+ try:
+ for i, session in enumerate(sessions):
+ session.build_model()
+ logger.info(f"{model_paths[i]} is loaded successfully, {device_num} sessions are inited")
+ except Exception as e:
+ raise RuntimeError(f"Init session failed, please check! ERROR: {e}") from e
+
+ self.current_model = model_name
+ self.device_num = device_num
+ self.infer_sessions = sessions
+
+ def del_session(self):
+ """Deletes the current model session.
+
+ Cleans up the current model, device number, and model dictionary.
+ Finalizes all inference sessions if they exist.
+
+ Raises:
+ RuntimeError: If deleting the session fails.
+ """
+ self.current_model = ""
+ self.device_num = -1
+ self._model_dict = {}
+
+ if self.infer_sessions is None:
+ logger.warning("Session not inited, please check!")
+ else:
+ try:
+ for session in self.infer_sessions:
+ session.finalize()
+ self.infer_sessions = None
+ logger.info("Session is deleted successfully")
+ except Exception as e:
+ raise RuntimeError(f"Delete session failed, please check! ERROR: {e}") from e
+
+ def batch_infer(self, batch_inputs, task_type):
+ """Performs batch inference on the given inputs.
+
+ Processes batch inputs by extending them to match model batch size,
+ and performs inference using the specified task type session.
+
+ Args:
+ batch_inputs (list): List of input arrays for batch inference.
+ task_type (int): Index of the inference session to use.
+
+ Returns:
+ list: List of concatenated results from the inference.
+
+ Raises:
+ ValueError: If inputs are empty, task type is invalid, or
+ model has not been loaded.
+ RuntimeError: If model has not been loaded.
+ """
+ if not batch_inputs:
+ raise ValueError("Input is empty, please check!")
+ if self.infer_sessions is None:
+ raise RuntimeError("Model has not been loaded, please check!")
+ if task_type >= len(self.infer_sessions):
+ raise ValueError(f"Only task_type {list(range(len(self.infer_sessions)))} is supported, \
+ but given {task_type}")
+
+ session = self.infer_sessions[task_type]
+ data_size = batch_inputs[0].shape[0]
+ model_batch_size = session.batch_size
+
+ batch_inputs = [Utilities.extend_input(item, model_batch_size) for item in batch_inputs]
+ total_size = batch_inputs[0].shape[0]
+
+ input_datas = []
+ result_list = []
+ for i in range(0, total_size, model_batch_size):
+ input_datas.append([item[i: i + model_batch_size] for item in batch_inputs])
+ if len(input_datas) > 0 and len(input_datas) % self.device_num == 0:
+ result_list = self._infer(session, input_datas, result_list)
+ input_datas = []
+ if input_datas:
+ result_list = self._infer(session, input_datas, result_list)
+
+ model_out_num = len(result_list[0])
+ final_ret = []
+ for i in range(model_out_num):
+ final_ret.append(np.concatenate([result[i] for result in result_list], axis=0)[:data_size])
+ return final_ret
+
+ def _infer(self, session, input_datas, result_list):
+ """Performs inference on input data using the given session.
+
+ Internal helper method that executes the actual inference and
+ extends the result list with predicted results.
+
+ Args:
+ session (MultiprocessInference): Inference session to use.
+ input_datas (list): List of input data to be inferred.
+ result_list (list): List to extend with inference results.
+
+ Returns:
+ list: Updated result list with new inference results.
+
+ Raises:
+ RuntimeError: If the model prediction fails.
+ """
+ try:
+ predict_result = session.infer(input_datas)
+ result_list.extend(predict_result)
+ except (ValueError, RuntimeError) as e:
+ raise RuntimeError(f"{self.current_model} predict failed, please check! ERROR: {e}") from e
+ return result_list
+
+ def is_ready(self):
+ """Checks if the model sessions are ready for inference.
+
+ Verifies if inference sessions are initialized and operational.
+
+ Returns:
+ HealthStatus: Status indicating if model is NOTLOADED, READY,
+ or in EXCEPTION state.
+ """
+ if self.infer_sessions is None:
+ logger.info("Model has not been loaded, please check!")
+ return HealthStatus.NOTLOADED
+ try:
+ for session in self.infer_sessions:
+ session.is_ready()
+ logger.info(f"Model: {self.current_model} is ready!")
+ return HealthStatus.READY
+ except Exception:
+ logger.info(f"Model: {self.current_model} is unavailable!")
+ return HealthStatus.EXCEPTION
+
+ @property
+ def session_name(self):
+ """str: Name of the currently loaded model."""
+ return self.current_model
+
+ @property
+ def model_dict(self):
+ """dict: Dictionary containing model information."""
+ return self._model_dict
diff --git a/MindEnergy/applications/deploy/src/utils.py b/MindEnergy/applications/deploy/src/utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..6bb225e181d8d725ebe263bba1d26e18316b8593
--- /dev/null
+++ b/MindEnergy/applications/deploy/src/utils.py
@@ -0,0 +1,369 @@
+# Copyright 2025 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""
+Utilities module for MindScience deployment service.
+
+This module provides a collection of utility functions and classes for various
+tasks in the MindScience deployment service, including:
+
+- File handling operations (upload, extraction, HDF5 read/write)
+- Model file collection and management
+- Input data extension for batch processing
+- Task status counting
+- NPU (Neural Processing Unit) resource monitoring
+- Asynchronous operations support
+
+The Utilities class contains static methods that are commonly used across
+different components of the deployment service.
+"""
+
+import os
+import re
+import zipfile
+import tarfile
+import asyncio
+import traceback
+import subprocess
+from typing import Dict, List
+
+import h5py
+import aiofiles
+import numpy as np
+from loguru import logger
+from fastapi import UploadFile, HTTPException
+
+from .enums import TaskStatus
+from .config import DeployConfig, ModelConfig
+
+class Utilities:
+ """
+ Utilities helper class that encapsulates deployment-related utility functions.
+
+ This class includes file handling, model collection, HDF5 file read/write,
+ asynchronous file upload saving, and methods to collect NPU usage information.
+ """
+ @staticmethod
+ def collect_mindir_models(root_dir: str) -> Dict[str, List[str]]:
+ """Collect *.mindir model files from each subdirectory under the specified root directory.
+
+ Args:
+ root_dir (str): Root directory path to search. Each immediate subdirectory
+ is treated as a distinct model name.
+
+ Returns:
+ Dict[str, List[str]]: Mapping of model directory name to a sorted list of
+ absolute paths of all `.mindir` files found in that model directory.
+
+ Raises:
+ ValueError: If `root_dir` is not a valid directory.
+ """
+ if not os.path.isdir(root_dir):
+ raise ValueError(f"Invalid directory: {root_dir}")
+
+ model_dict = {}
+ for model_name in os.listdir(root_dir):
+ model_dir = os.path.join(root_dir, model_name)
+ if not os.path.isdir(model_dir):
+ continue
+
+ mindir_files = []
+ for file in os.listdir(model_dir):
+ if file.endswith(".mindir"):
+ abs_path = os.path.abspath(os.path.join(model_dir, file))
+ mindir_files.append(abs_path)
+
+ if mindir_files:
+ model_dict[model_name] = sorted(mindir_files)
+
+ return model_dict
+
+ @staticmethod
+ def extend_input(array: np.ndarray, batch_size: int) -> np.ndarray:
+ """Extend (pad) the input array along the first dimension so that its length
+ is divisible by `batch_size`.
+
+ If the first dimension is already divisible by `batch_size`, the original
+ array is returned. Otherwise, zero-rows are appended to the end of the
+ array to make the first dimension divisible.
+
+ Args:
+ array (np.ndarray): The numpy array to pad. Must have at least one dimension.
+ batch_size (int): The batch size to make the first dimension divisible by.
+
+ Returns:
+ np.ndarray: The padded numpy array with the same dtype as the input.
+
+ Raises:
+ ValueError: If `array` has no shape (empty).
+ """
+ if not array.shape:
+ raise ValueError(f"The array to extend is empty, please check: {array}.")
+
+ if array.shape[0] % batch_size != 0:
+ padding_length = batch_size - array.shape[0] % batch_size
+ padding_contents = np.zeros((padding_length, *array.shape[1:]), dtype=array.dtype)
+ return np.concatenate([array, padding_contents], axis=0)
+ return array
+
+ @staticmethod
+ async def save_upload_file(upload_file: UploadFile, destination: str):
+ """Asynchronously save a FastAPI `UploadFile` to the specified destination
+ path in chunks.
+
+ Args:
+ upload_file (UploadFile): The uploaded file object provided by FastAPI.
+ destination (str): Target file path (including filename) to save the upload.
+
+ Returns:
+ None
+
+ Raises:
+ HTTPException: Raises an HTTPException (status 500) if saving fails.
+ """
+ try:
+ async with aiofiles.open(destination, "wb") as f:
+ while chunk := await upload_file.read(DeployConfig.chunk_size):
+ await f.write(chunk)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"save upload file failed: {e}") from e
+
+ @staticmethod
+ def extract_file(file_path: str, extract_to: str = None):
+ """Extract supported archive file formats (tar, tar.gz, tgz, tar.bz2, tbz2,
+ tar.xz, txz, zip) to a target directory.
+
+ Args:
+ file_path (str): Path to the archive file to extract.
+ extract_to (str, optional): Target directory to extract into. If None,
+ the archive's containing directory is used.
+
+ Returns:
+ None
+
+ Raises:
+ FileNotFoundError: If `file_path` does not exist.
+ ValueError: If the file format is not a supported archive type.
+ """
+ if not os.path.exists(file_path):
+ raise FileNotFoundError(f"File not found: {file_path}")
+
+ if extract_to is None:
+ extract_to = os.path.dirname(file_path)
+
+ if file_path.endswith((".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")):
+ with tarfile.open(file_path, 'r:*') as tf:
+ tf.extractall(extract_to)
+ elif file_path.endswith(".zip"):
+ with zipfile.ZipFile(file_path, 'r') as zf:
+ zf.extractall(extract_to)
+ else:
+ raise ValueError(f"Unsupported compressed file format: {file_path}.")
+
+ @staticmethod
+ def count_pending_task(tasks_status: Dict[str, str]) -> int:
+ """Count the number of tasks in a mapping that are in the PENDING state.
+
+ Args:
+ tasks_status (Dict[str, str]): Mapping from task ID to task status
+ (expected to be members of `TaskStatus`).
+
+ Returns:
+ int: Number of tasks whose status equals `TaskStatus.PENDING`.
+ """
+ pending_number = 0
+ for value in tasks_status.values():
+ if value == TaskStatus.PENDING:
+ pending_number += 1
+ return pending_number
+
+ @staticmethod
+ def load_h5_file(file_path: str) -> List[np.ndarray]:
+ """Load input columns specified in `ModelConfig.input_columns` from an HDF5
+ file and return them as a list of numpy arrays.
+
+ Args:
+ file_path (str): HDF5 file path to read from.
+
+ Returns:
+ List[np.ndarray]: List of numpy arrays for each input column defined in
+ `ModelConfig.input_columns`, preserving the same order.
+
+ Raises:
+ ValueError: If any key from `ModelConfig.input_columns` is missing in the file.
+ """
+ batch_inputs = []
+ with h5py.File(file_path, "r") as f:
+ for key in ModelConfig.input_columns:
+ if key not in f.keys():
+ raise ValueError(f"Key {key} not in dataset, please check!")
+ batch_inputs.append(np.array(f[key], dtype=f[key].dtype))
+ return batch_inputs
+
+ @staticmethod
+ def save_h5_file(items: List[np.ndarray], file_path: str):
+ """Write prediction outputs to an HDF5 file.
+
+ Output column names are taken from `ModelConfig.output_columns`. If the
+ number of `items` does not match the number of configured output column
+ names, the function will rename columns using the default format
+ `output_0`, `output_1`, ... and log a warning.
+
+ Args:
+ items (List[np.ndarray]): List of numpy arrays to write as outputs.
+ file_path (str): HDF5 output file path.
+
+ Returns:
+ None
+
+ Notes:
+ If the number of configured output columns does not match the number
+ of provided items, the method assigns default names and logs a
+ warning.
+ """
+ output_columns = ModelConfig.output_columns
+ if len(items) != len(output_columns):
+ logger.warning("Number of outputs in config is inconsistent with the number of the actual outputs, \
+ rename the output column.")
+ output_columns = tuple(f"output_{i}" for i in range(len(items)))
+
+ with h5py.File(file_path, "w") as f:
+ for key, value in zip(output_columns, items):
+ f.create_dataset(key, data=value)
+
+ @staticmethod
+ async def get_npu_usage():
+ """Asynchronously execute `npu-smi info` to obtain NPU utilization and memory
+ information, parse the output, and return a structured list of stats.
+
+ Each returned item is a dictionary with the following keys:
+ - id (int): NPU device ID
+ - utilization_percent (int): NPU utilization percentage
+ - memory_used_mb (int): Memory used in MB
+ - memory_total_mb (int): Total memory in MB
+
+ Returns:
+ List[Dict[str, int]]: List of dictionary entries containing NPU stats.
+
+ Raises:
+ FileNotFoundError: If the `npu-smi` command is not available.
+ RuntimeError: If executing `npu-smi info` fails for other reasons.
+ """
+ try:
+ command = ["npu-smi", "info"]
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+ output_bytes, stderr_bytes = await process.communicate()
+
+ if process.returncode != 0:
+ if b"command not found" in stderr_bytes.lower() or \
+ b"no such file or directory" in stderr_bytes.lower():
+ logger.error("ERROR: 'npu-smi' command not found!")
+ logger.error("Please ensure that the Ascend CANN toolkit is correctly installed and \
+ its bin directory has been added to the system's PATH environment variable.")
+ raise FileNotFoundError("'npu-smi' command not found.")
+ raise subprocess.CalledProcessError(
+ process.returncode, command, output=output_bytes, stderr=stderr_bytes
+ )
+ except Exception as e:
+ logger.error(f"Execute 'npu-smi info' failed, ERROR: {e}")
+ raise RuntimeError(f"Execute 'npu-smi info' failed, ERROR: {e}") from e
+
+ logger.info("=== npu-smi info original output ===")
+ logger.info(output_bytes.decode("utf-8", errors="replace"))
+ logger.info("===========================")
+
+ output = output_bytes.decode("utf-8", errors="ignore")
+
+ npu_stats = []
+ lines = output.strip().splitlines()
+
+ if not lines:
+ logger.warning("'npu-smi info' did not return any output.")
+ return npu_stats
+
+ for i, line in enumerate(lines):
+ match_first_line = re.match(r"^\s*\|\s*(\d+)\s+[a-zA-Z0-9]+\s*\|", line)
+
+ if not match_first_line:
+ logger.warning(f"Can not match NPU ID in line: {line}.")
+ continue
+
+ try:
+ npu_id_str = match_first_line.group(1)
+ if not npu_id_str:
+ logger.warning(f"NPU ID is empty in line: {line}.")
+ continue
+
+ npu_id = int(npu_id_str)
+ logger.info(f"Found NPU ID: {npu_id}.")
+
+ if (i + 1) >= len(lines):
+ logger.warning("Not enough rows to parse NPU utilization information.")
+ continue
+
+ second_line = lines[i + 1]
+ logger.info(f"Process NPU {npu_id} data line: {second_line}.")
+
+ parts = second_line.split("|")
+ if len(parts) < 4:
+ logger.warning(f"The data row format is incorrect and \
+ the number of columns is insufficient: {second_line}.")
+ continue
+
+ data_string = parts[-2].strip()
+ logger.info(f"Extracted data: '{data_string}'.")
+
+ tokens = re.findall(r"\d+", data_string)
+ logger.info(f"Extracted tokens: {tokens}.")
+
+ if len(tokens) < 5:
+ logger.warning(f"Insufficient tokens: {tokens}.")
+ continue
+
+ try:
+ utilization_percent = int(tokens[0])
+ mem_used = int(tokens[3])
+ mem_total = int(tokens[4])
+
+ info = {
+ "id": npu_id,
+ "utilization_percent": utilization_percent,
+ "memory_used_mb": mem_used,
+ "memory_total_mb": mem_total,
+ }
+ logger.info(f"Successfully parse {npu_id} info: {info}.")
+ npu_stats.append(info)
+
+ except (ValueError, IndexError) as e:
+ logger.error(f"Error parsing NPU {npu_id} data: {e}.")
+ continue
+
+ except Exception as e:
+ logger.exception(f"Unexpected error when parsing NPU data: {e}.")
+ traceback.print_exc()
+ continue
+
+ if not npu_stats:
+ logger.warning(f"Can not parse any info from 'npu-smi info' output. original output: \n{output}")
+ if "No NPU device found" in output:
+ logger.info("'No NPU device found' - Perhaps there are no available NPU devices.")
+ else:
+ logger.info(f"Successfully parse {len(npu_stats)} NPU info.")
+
+ return npu_stats