From d1ab2e68dc39fbb2d3205aecb1386f38b381d771 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Thu, 10 Jul 2025 15:57:15 +0800 Subject: [PATCH 01/10] issue template update --- .gitee/ISSUE_TEMPLATE/a_guide.yml | 16 ---------------- .gitee/ISSUE_TEMPLATE/bug.yml | 14 +++++++++----- .gitee/ISSUE_TEMPLATE/config.yml | 3 --- 3 files changed, 9 insertions(+), 24 deletions(-) delete mode 100644 .gitee/ISSUE_TEMPLATE/a_guide.yml diff --git a/.gitee/ISSUE_TEMPLATE/a_guide.yml b/.gitee/ISSUE_TEMPLATE/a_guide.yml deleted file mode 100644 index 534078c..0000000 --- a/.gitee/ISSUE_TEMPLATE/a_guide.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: 👉 遇到问题先看这里 -description: 这里也许能快速解决您的问题 -body: - - type: markdown - attributes: - value: | - ## 🌟 第一次使用工具遇到问题? - 按[🚀 快速入门](https://gitee.com/aisbench/benchmark#-%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8)走一遍能解决90%的工具基本使用问题! - - ## 🧭 尝试检索FAQ查看共性问题解决方法 - 检索[📑 FAQ](https://gitee.com/aisbench/benchmark/wikis/FAQ/AISBench%20FAQ%E4%B8%BB%E9%A1%B5),目前FAQ可以解决0%的共性问题 - - ## ❓ 搜索历史issue,查看冷门的同类问题 - 在[🔖 Issue](https://gitee.com/aisbench/benchmark/issues)中搜索历史类似问题 - - 💡提示:若仍然未解决问题,可[🔙 返回选择其他 Issue 模板](https://gitee.com/aisbench/benchmark/issues/new/choose)提交 \ No newline at end of file diff --git a/.gitee/ISSUE_TEMPLATE/bug.yml b/.gitee/ISSUE_TEMPLATE/bug.yml index b49eeaa..dc1a893 100644 --- a/.gitee/ISSUE_TEMPLATE/bug.yml +++ b/.gitee/ISSUE_TEMPLATE/bug.yml @@ -7,11 +7,15 @@ body: - type: markdown attributes: value: | - ### ⚠️请再次确认已查阅 - [🚀 快速入门](https://gitee.com/aisbench/benchmark#-%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8) - [📑 FAQ](https://gitee.com/aisbench/benchmark/wikis/FAQ/AISBench%20FAQ%E4%B8%BB%E9%A1%B5) - [🔖 Issue](https://gitee.com/aisbench/benchmark/issues) - 并发现无法解决问题。 + ## 👉 遇到问题先看这里 + ### 🌟 第一次使用工具遇到问题? + 按[🚀 快速入门](https://gitee.com/aisbench/benchmark#-%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8)走一遍能解决90%的工具基本使用问题! + + ### 🧭 尝试检索FAQ查看共性问题解决方法 + 检索[📑 FAQ](https://gitee.com/aisbench/benchmark/wikis/FAQ/AISBench%20FAQ%E4%B8%BB%E9%A1%B5),目前FAQ可以解决0%的共性问题 + + ### ❓ 搜索历史issue,查看冷门的同类问题 + 在[🔖 Issue](https://gitee.com/aisbench/benchmark/issues)中搜索历史类似问题 - type: input id: os_and_version attributes: diff --git a/.gitee/ISSUE_TEMPLATE/config.yml b/.gitee/ISSUE_TEMPLATE/config.yml index a207852..a43b055 100644 --- a/.gitee/ISSUE_TEMPLATE/config.yml +++ b/.gitee/ISSUE_TEMPLATE/config.yml @@ -1,8 +1,5 @@ blank_issues_enabled: false # 禁止空白 Issue,强制使用模板 templates: - - name: 👉 遇到问题先看这里 - about: 这里也许能快速解决您的问题 - file: a_guide.yml - name: 🔧 Bug 报告 about: 提交程序中的错误或异常 file: bug.yml -- Gitee From 8f1d97dd9ca34080b8725576471d4db34d1abe3c Mon Sep 17 00:00:00 2001 From: yh_silence Date: Fri, 11 Jul 2025 14:58:53 +0800 Subject: [PATCH 02/10] issue template update: consult.yml add direct --- .gitee/ISSUE_TEMPLATE/consult.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.gitee/ISSUE_TEMPLATE/consult.yml b/.gitee/ISSUE_TEMPLATE/consult.yml index cdc997f..b88fccc 100644 --- a/.gitee/ISSUE_TEMPLATE/consult.yml +++ b/.gitee/ISSUE_TEMPLATE/consult.yml @@ -3,6 +3,18 @@ description: 咨询工具使用过程中的问题 title: "[疑问] " labels: ["question", "待回复"] body: + - type: markdown + attributes: + value: | + ## 👉 遇到问题先看这里 + ### 🌟 第一次使用工具遇到问题? + 按[🚀 快速入门](https://gitee.com/aisbench/benchmark#-%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8)走一遍能解决90%的工具基本使用问题! + + ### 🧭 尝试检索FAQ查看共性问题解决方法 + 检索[📑 FAQ](https://gitee.com/aisbench/benchmark/wikis/FAQ/AISBench%20FAQ%E4%B8%BB%E9%A1%B5),目前FAQ可以解决0%的共性问题 + + ### ❓ 搜索历史issue,查看冷门的同类问题 + 在[🔖 Issue](https://gitee.com/aisbench/benchmark/issues)中搜索历史类似问题 - type: textarea id: current-content attributes: -- Gitee From 39c3b28f0ca1f5067f0fde7ffb505574692bc569 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Mon, 14 Jul 2025 10:19:17 +0800 Subject: [PATCH 03/10] calculate speed up --- .../default_perf_metric_calculator.py | 18 ++- .../stable_perf_metric_calculator.py | 79 +++++++----- ais_bench/benchmark/clients/base_client.py | 6 +- .../models/huggingface_above_v4_33.py | 2 +- ais_bench/benchmark/models/performance_api.py | 4 +- .../benchmark/models/vllm_custom_api_chat.py | 2 +- .../icl_inferencer/icl_gen_inferencer.py | 19 +-- .../icl_inferencer/icl_gen_perf_inferencer.py | 30 ++--- .../icl_gen_pressure_inferencer.py | 9 +- ais_bench/benchmark/runners/local_api.py | 2 +- .../benchmark/summarizers/default_perf.py | 26 ++-- ais_bench/benchmark/tasks/openicl_infer.py | 4 +- ais_bench/benchmark/utils/results.py | 15 ++- ais_bench/benchmark/utils/summarize_plot.py | 120 +++++++++--------- 14 files changed, 172 insertions(+), 164 deletions(-) diff --git a/ais_bench/benchmark/calculators/default_perf_metric_calculator.py b/ais_bench/benchmark/calculators/default_perf_metric_calculator.py index 8334457..85853b1 100644 --- a/ais_bench/benchmark/calculators/default_perf_metric_calculator.py +++ b/ais_bench/benchmark/calculators/default_perf_metric_calculator.py @@ -75,7 +75,8 @@ class DefaultPerfMetricCalculator(BasePerfMetricCalculator): else: result["average_decode_latencies"] = result["prefill_latency"] self.logger.info("Converting perf results of stage ...") - self.result[stage_name] = self.convert_result(copy.deepcopy(result)) + self.result[stage_name] = self.convert_result(result) + self.logger.info("Finish Converting!") def get_common_res(self): return {k: v for k, v in self.common_metrics.items() if v is not None} @@ -155,9 +156,13 @@ class DefaultPerfMetricCalculator(BasePerfMetricCalculator): return ans def calculate(self): + self.logger.info("Start calculating metrics ...") self.__calc_metrics() + self.logger.info("Start calculating common metrics ...") self.__calc_common_metrics() + self.logger.info("Start calculating add units ...") self.add_units() + self.logger.info("Finish calculating perf data!") def __calc_metrics(self): """Calculate various statistical metrics for performance analysis.""" @@ -171,17 +176,18 @@ class DefaultPerfMetricCalculator(BasePerfMetricCalculator): value = self.__statistic_prefill_or_decode_batch_size(value) # Compute statistical values + arr = np.array(value) for stat in self.stats_list: if stat == "Average": - stats[stat] = round(np.average(value), 4) + stats[stat] = round(arr.mean(), 4) elif stat == "Min": - stats[stat] = round(float(min(value)), 4) + stats[stat] = round(float(arr.min()), 4) elif stat == "Max": - stats[stat] = round(float(max(value)), 4) + stats[stat] = round(float(arr.max()), 4) elif stat == "Median": - stats[stat] = round(np.percentile(value, 50), 4) + stats[stat] = round(np.percentile(arr, 50), 4) elif is_legal_percentage_str(stat): - stats[stat] = round(np.percentile(value, int(stat[1:])), 4) + stats[stat] = round(np.percentile(arr, int(stat[1:])), 4) # Store the computed metrics if self.metrics.get(metric) is None: diff --git a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py index 8336104..c3794dd 100644 --- a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py +++ b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py @@ -1,4 +1,5 @@ import csv +import heapq from tqdm import tqdm import collections import math @@ -41,43 +42,45 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): self._process_result(perf_details.get("requests"), stage_name) def _get_requests_id(self, perf_details): - request_time_sections = [] - for id in range(len(perf_details["requests"]["id"])): - request_time_sections.append({ - "id": id, - "start_time": perf_details["requests"]["start_time"][id], - "end_time": perf_details["requests"]["end_time"][id], - }) - + request_time_sections = [ + {"id": id, "start_time": perf_details["requests"]["start_time"][id], "end_time": perf_details["requests"]["end_time"][id]} + for id in range(len(perf_details["requests"]["id"])) + ] sorted_time_sections = sorted(request_time_sections, key=lambda x: x["start_time"]) + + active_heap = [] # 最小堆存储(end_time, id) id_lists = [] - working_reqs = {} self.logger.info("Calculating stable stage ...") - for i, section in enumerate(tqdm(sorted_time_sections)): - poped_ids = [] - for k in list(working_reqs.keys()): - if working_reqs[k][1] < section["start_time"]: - poped_ids.append(k) - working_reqs.pop(k, None) - working_reqs[section["id"]] = [section["start_time"], section["end_time"]] - if len(working_reqs) == self.max_concurrency: + + for section in tqdm(sorted_time_sections): + # 1. 清理过期请求并记录最小结束时间 + poped_min_end = None + while active_heap and active_heap[0][0] < section["start_time"]: + end_time, req_id = heapq.heappop(active_heap) + poped_min_end = end_time if poped_min_end is None else min(poped_min_end, end_time) + + # 2. 添加当前请求 + heapq.heappush(active_heap, (section["end_time"], section["id"])) + current_active = len(active_heap) + + # 3. 判断稳定阶段 + if current_active == self.max_concurrency: id_lists.append(section["id"]) if len(id_lists) == 1: - self.stage_section[0] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total start time - elif len(working_reqs) >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 0: + self.stage_section[0] = active_heap[0][0] # 堆顶即最小结束时间 + elif current_active >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 0: id_lists.append(section["id"]) - else: - if len(id_lists) > 0: # start to leave stable - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in poped_ids]) - break - - if len(id_lists) > 0: - id_lists.pop(0) # ignore first request that reached max concurrency - if len(id_lists) == 0: + elif len(id_lists) > 0: # 退出稳定阶段 + self.stage_section[1] = poped_min_end if poped_min_end is not None else active_heap[0][0] + break + + # 4. 后处理 + if id_lists: + id_lists.pop(0) + if not id_lists: raise RuntimeError("Can not find a stable stage!") - if self.stage_section[1] == 0: - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total end time + self.stage_section[1] = active_heap[0][0] if active_heap else sorted_time_sections[-1]["end_time"] return id_lists def _get_legal_stats_list(self, stats_list): @@ -116,7 +119,8 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): else: result["average_decode_latencies"] = result["prefill_latency"] self.logger.info("Converting perf results of stage ...") - self.result[stage_name] = self.convert_result(copy.deepcopy(result)) + self.result[stage_name] = self.convert_result(result) + self.logger.info("Finish Converting!") def get_common_res(self): return {k: v for k, v in self.common_metrics.items() if v is not None} @@ -196,9 +200,13 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): return ans def calculate(self): + self.logger.info("Start calculating metrics ...") self.__calc_metrics() + self.logger.info("Start calculating common metrics ...") self.__calc_common_metrics() + self.logger.info("Start calculating add units ...") self.add_units() + self.logger.info("Finish calculating perf data!") def __calc_metrics(self): """Calculate various statistical metrics for performance analysis.""" @@ -212,17 +220,18 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): value = self.__statistic_prefill_or_decode_batch_size(value) # Compute statistical values + arr = np.array(value) for stat in self.stats_list: if stat == "Average": - stats[stat] = round(np.average(value), 4) + stats[stat] = round(arr.mean(), 4) elif stat == "Min": - stats[stat] = round(float(min(value)), 4) + stats[stat] = round(float(arr.min()), 4) elif stat == "Max": - stats[stat] = round(float(max(value)), 4) + stats[stat] = round(float(arr.max()), 4) elif stat == "Median": - stats[stat] = round(np.percentile(value, 50), 4) + stats[stat] = round(np.percentile(arr, 50), 4) elif is_legal_percentage_str(stat): - stats[stat] = round(np.percentile(value, int(stat[1:])), 4) + stats[stat] = round(np.percentile(arr, int(stat[1:])), 4) # Store the computed metrics if self.metrics.get(metric) is None: diff --git a/ais_bench/benchmark/clients/base_client.py b/ais_bench/benchmark/clients/base_client.py index 2cabfca..5229529 100644 --- a/ais_bench/benchmark/clients/base_client.py +++ b/ais_bench/benchmark/clients/base_client.py @@ -179,7 +179,7 @@ class BaseClient(ABC): raise_error(f"Error processing stream response: {e}", self.lock, self.request_counter) except HTTPError as e: raise_error(f"HTTP error during stream response processing: {e}.", self.lock, self.request_counter) - + self.rev_count() self.update_request_time(inputs, start_time) return "".join(response) @@ -209,9 +209,7 @@ class BaseStreamClient(BaseClient, ABC): cur_time_point = time.perf_counter() response_dict = self.process_stream_line(json_content) if time_name not in response_dict.keys(): - response_dict[time_name] = ( - cur_time_point - last_time_point - ) * 1000 + response_dict[time_name] = round((cur_time_point - last_time_point) * 1000, 4) response_dict["chunk_time_point"] = cur_time_point * 1000 yield response_dict time_name = "decode_time" diff --git a/ais_bench/benchmark/models/huggingface_above_v4_33.py b/ais_bench/benchmark/models/huggingface_above_v4_33.py index 85c2c32..2c9982e 100644 --- a/ais_bench/benchmark/models/huggingface_above_v4_33.py +++ b/ais_bench/benchmark/models/huggingface_above_v4_33.py @@ -201,7 +201,7 @@ class HuggingFacewithChatTemplate(PerformanceModel): for k, v in other_kwargs.items(): if v is not None: self.logger.warning(f'Unused argument {k}={v}') - + def handle_perf_result(self, output_filepath, output_filename): e2e_latency = max(self.timestamps) - min(self.timestamps) return {"Benchmark Duration":{"total":str(round(e2e_latency, 4)) + ' ms'}} diff --git a/ais_bench/benchmark/models/performance_api.py b/ais_bench/benchmark/models/performance_api.py index ba94dda..2b05603 100644 --- a/ais_bench/benchmark/models/performance_api.py +++ b/ais_bench/benchmark/models/performance_api.py @@ -77,7 +77,7 @@ class PerformanceAPIModel(BaseAPIModel): cache_data.num_input_chars = 0 cache_data.input_token_id = token_id cache_data.num_input_tokens = len(token_id) - + def set_result(self, data: MiddleData) -> None: """Update decoding information for a given request.""" if not data.output: @@ -146,10 +146,12 @@ class PerformanceAPIModel(BaseAPIModel): self.result_cache[key].num_generated_tokens = len(tokens) performance_data = [] try: + self.logger.info("Start converting origin perf data ...") performance_data = [ cache_data.convert_to_performance_data() for cache_data in self.result_cache.values() ] + self.logger.info("Finish converting origin perf data") except Exception as e: self.logger.error(f"Error converting performance data: {e}") finally: diff --git a/ais_bench/benchmark/models/vllm_custom_api_chat.py b/ais_bench/benchmark/models/vllm_custom_api_chat.py index f776454..3743c4a 100644 --- a/ais_bench/benchmark/models/vllm_custom_api_chat.py +++ b/ais_bench/benchmark/models/vllm_custom_api_chat.py @@ -176,7 +176,7 @@ class VLLMCustomAPIChat(PerformanceAPIModel): elif item['role'] == 'SYSTEM': msg['role'] = 'system' messages.append(msg) - + generation_kwargs = self.generation_kwargs.copy() generation_kwargs.update({"max_tokens": max_out_len}) generation_kwargs.update({"model": self.model}) diff --git a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_inferencer.py b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_inferencer.py index 8fe26d9..36665a2 100644 --- a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_inferencer.py +++ b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_inferencer.py @@ -9,10 +9,10 @@ import multiprocessing from pathlib import Path from multiprocessing import RLock, freeze_support from typing import List, Optional, Tuple, Any - import torch import shutil from tqdm import tqdm +import itertools from ais_bench.benchmark.models.base import BaseModel from ais_bench.benchmark.registry import ICL_INFERENCERS @@ -24,6 +24,7 @@ from ..icl_prompt_template import PromptTemplate from ..icl_retriever import BaseRetriever from ..utils.logging import get_logger from .icl_base_inferencer import BaseInferencer, GenInferencerOutputHandler +import concurrent.futures logger = get_logger(__name__) @@ -43,7 +44,7 @@ def submit_single_model(model_cfg, mp_queue, **extra_gen_kwargs): raise AttributeError(f'{model} has no except outputs, please check model config') return model.get_performance_data() - + @ICL_INFERENCERS.register_module() class GenInferencer(BaseInferencer): """Generation Inferencer class to directly evaluate by generation. @@ -115,7 +116,7 @@ class GenInferencer(BaseInferencer): logger.warning(f"Inputs data number is {len(inputs)}, result will be empty") return results max_concurrency = extra_gen_kwargs.get("batch_size", 1) - + # Maximum MAX_CONCURRENCY_PER_PROCESS concurrency per process, number of processes less than number of cores workers_num = min( multiprocessing.cpu_count(), (max_concurrency - 1) // DEFAULT_MAX_CONCURRENCY_PER_PROCESS + 1 @@ -141,7 +142,7 @@ class GenInferencer(BaseInferencer): data_buckets = [] real_data_nums = [] bucket_index = 0 - data_index = 0 + data_index = 0 while data_index Tuple[List, List]: @@ -244,7 +245,7 @@ class GenInferencer(BaseInferencer): if hasattr(self.model, "set_performance"): extra_kwargs['do_performance'] = self.model.do_performance return extra_kwargs - + def inference(self, retriever: BaseRetriever, ice_template: Optional[PromptTemplate] = None, diff --git a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_perf_inferencer.py b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_perf_inferencer.py index f8bf395..c14a467 100644 --- a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_perf_inferencer.py +++ b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_perf_inferencer.py @@ -22,7 +22,7 @@ from ..icl_prompt_template import PromptTemplate from ..icl_retriever import BaseRetriever from ..utils.logging import get_logger from .icl_base_inferencer import GenInferencerOutputHandler -from ais_bench.benchmark.utils.results import dump_results_dict +from ais_bench.benchmark.utils.results import dump_results_dict, fast_dump_results_dict from .icl_gen_inferencer import GenInferencer logger = get_logger(__name__) @@ -125,29 +125,15 @@ class GenPerfInferencer(GenInferencer): parsed_entries = self.model.parse_template(entry, mode='gen') results = self.inference_with_multi_process( self.model, self.model_cfg, parsed_entries, golds, **extra_gen_kwargs) - results.sort(key=lambda x: x['id']) + logger.info("Start extracting pref datas ...") preds = self.extract_preds(results) + logger.info("Finish extracting pref datas!") task_params = {"max_concurrency": self.batch_size} num_return_sequences = getattr(self.model, "generation_kwargs", {}).get( "num_return_sequences", 1 ) - for prediction in batched(results, num_return_sequences): - if num_return_sequences == 1: - prediction = prediction[0] - if not prediction.get('is_success'): - pred = "" - else: - pred = prediction.get('output') - data_id = prediction.get('id') - if data_id >= len(golds) or data_id < 0: - raise IndexError(f"No gold of output id {data_id}") - output_handler.save_results(parsed_entries[data_id], - pred, - data_id, - gold=golds[data_id]) - end_time_stamp = time.perf_counter() if self.is_main_process: @@ -157,11 +143,12 @@ class GenPerfInferencer(GenInferencer): "requests": preds, } logger.info("Dumping detail perf data ...") - dump_results_dict( + dump_start = time.perf_counter() + fast_dump_results_dict( perf_details, - osp.join(output_filepath, output_filename + "_details.json"), - False + osp.join(output_filepath, output_filename + "_details.json") ) + logger.info(f"Dump detail perf data cost: {time.perf_counter() - dump_start}(s)") if self.dump_timer and self.is_main_process: timer_filepath = osp.join(output_filepath, "timer", "time.jsonl") @@ -192,6 +179,9 @@ class GenPerfInferencer(GenInferencer): } preds["is_success"] = [pred.get("is_success", False) for pred in results] preds["is_empty"] = [pred.get("is_empty", False) for pred in results] + del preds["chunk_time_point_list"] + del preds["input_data"] + del preds["output"] return preds diff --git a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_pressure_inferencer.py b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_pressure_inferencer.py index efd239e..fd32052 100644 --- a/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_pressure_inferencer.py +++ b/ais_bench/benchmark/openicl/icl_inferencer/icl_gen_pressure_inferencer.py @@ -20,7 +20,7 @@ from ais_bench.benchmark.utils.build import build_model_from_cfg from ais_bench.benchmark.global_consts import WORKERS_NUM from ..utils.logging import get_logger from .icl_base_inferencer import GenInferencerOutputHandler -from ais_bench.benchmark.utils.results import dump_results_dict +from ais_bench.benchmark.utils.results import dump_results_dict, fast_dump_results_dict from .icl_gen_perf_inferencer import GenPerfInferencer from .icl_gen_inferencer import DEFAULT_MAX_CONCURRENCY_PER_PROCESS @@ -167,7 +167,6 @@ class GenPressureInferencer(GenPerfInferencer): parsed_entries = self.model.parse_template(entry, mode='gen') results = self.pressure_infer_with_multiprocess( self.model, self.model_cfg, parsed_entries, golds, **extra_gen_kwargs) - results.sort(key=lambda x: x['id']) preds = self.extract_preds(results) preds['id'] = [i for i in range(len(preds['request_id']))] task_params = {"max_concurrency": self.batch_size} @@ -180,11 +179,13 @@ class GenPressureInferencer(GenPerfInferencer): "task": task_params, "requests": preds, } - dump_results_dict( + logger.info("Dumping detail perf data ...") + dump_start = time.perf_counter() + fast_dump_results_dict( perf_details, osp.join(output_filepath, output_filename + "_details.json"), - False ) + logger.info(f"Dump detail perf data cost: {time.perf_counter() - dump_start}(s)") if self.dump_timer and self.is_main_process: timer_filepath = osp.join(output_filepath, "timer", "time.jsonl") diff --git a/ais_bench/benchmark/runners/local_api.py b/ais_bench/benchmark/runners/local_api.py index c779ae7..903b014 100644 --- a/ais_bench/benchmark/runners/local_api.py +++ b/ais_bench/benchmark/runners/local_api.py @@ -17,7 +17,7 @@ from ais_bench.benchmark.registry import RUNNERS, TASKS from ais_bench.benchmark.tasks import OpenICLInferTask, OpenICLPerfTask, OpenICLInferMergedTask from ais_bench.benchmark.tasks.base import BaseTask from ais_bench.benchmark.utils import (build_dataset_from_cfg, build_synthetic_dataset_from_cfg, - build_model_from_cfg, get_infer_output_path, + build_model_from_cfg, get_infer_output_path, get_logger, task_abbr_from_cfg) from .base import BaseRunner diff --git a/ais_bench/benchmark/summarizers/default_perf.py b/ais_bench/benchmark/summarizers/default_perf.py index 2305ca1..73bc6de 100644 --- a/ais_bench/benchmark/summarizers/default_perf.py +++ b/ais_bench/benchmark/summarizers/default_perf.py @@ -5,6 +5,7 @@ import getpass import math import csv import json +import orjson import os.path as osp from datetime import datetime from typing import Any, Dict, List, Optional @@ -75,19 +76,18 @@ class DefaultPerfSummarizer: perf_details_file = osp.join(self.work_dir, "performances", model, f"{dataset}_details.json") if not osp.exists(perf_details_file): continue - with open(perf_details_file, 'r', encoding='utf-8') as file: - self.logger.info(f"Loading detail perf data of {model=} {dataset=} ...") - details_data = json.load(file) - plot_file_path = osp.join(self.work_dir, "performances", model, f"{dataset}_plot.html") - has_plot = plot_sorted_request_timelines( - details_data["requests"]["start_time"], - details_data["requests"]["prefill_latency"], - details_data["requests"]["end_time"], - details_data["requests"]["decode_token_latencies"], - output_file=plot_file_path, unit="s" - ) - if has_plot: - self.logger.info(f"The {dataset}_plot has been saved in {plot_file_path}") + self.logger.info(f"Loading detail perf data of {model=} {dataset=} ...") + details_data = orjson.loads(open(perf_details_file, "rb").read()) + plot_file_path = osp.join(self.work_dir, "performances", model, f"{dataset}_plot.html") + has_plot = plot_sorted_request_timelines( + details_data["requests"]["start_time"], + details_data["requests"]["prefill_latency"], + details_data["requests"]["end_time"], + details_data["requests"]["decode_token_latencies"], + output_file=plot_file_path, unit="s" + ) + if has_plot: + self.logger.info(f"The {dataset}_plot has been saved in {plot_file_path}") calculators_per_model[dataset] = build_perf_metric_calculator_from_cfg(calculator_conf) try: calculators_per_model[dataset]._init_datas(details_data) diff --git a/ais_bench/benchmark/tasks/openicl_infer.py b/ais_bench/benchmark/tasks/openicl_infer.py index dd22d37..89e4abc 100644 --- a/ais_bench/benchmark/tasks/openicl_infer.py +++ b/ais_bench/benchmark/tasks/openicl_infer.py @@ -106,7 +106,7 @@ class OpenICLInferTask(BaseTask): self._set_default_value(inferencer_cfg, 'batch_size', self.batch_size) inferencer_cfg['max_seq_len'] = self.model_cfg.get('max_seq_len') self.inferencer = ICL_INFERENCERS.build(inferencer_cfg) - + def _inference(self): self.logger.info( f'Start inferencing {task_abbr_from_cfg(self.sub_cfg)}') @@ -127,7 +127,7 @@ class OpenICLInferTask(BaseTask): # set inferencer's default value according to model's config' self.build_inference() - + self.inferencer.update_model_cfg(self.model_cfg) out_path = get_infer_output_path( diff --git a/ais_bench/benchmark/utils/results.py b/ais_bench/benchmark/utils/results.py index 9f70a98..2bc40a7 100644 --- a/ais_bench/benchmark/utils/results.py +++ b/ais_bench/benchmark/utils/results.py @@ -1,5 +1,6 @@ import csv import json +import orjson import collections import math from typing import Optional, Dict, Any @@ -17,6 +18,10 @@ def dump_results_dict(results_dict, filename, formatted = True): else: json.dump(results_dict, json_file, ensure_ascii=False) +def fast_dump_results_dict(results_dict, filename): + with open(filename, 'wb') as f: + f.write(orjson.dumps(results_dict)) + @dataclass class MiddleData: @@ -65,9 +70,7 @@ class MiddleData: "output": self.output, "output_token_id": self.output_token_id, "prefill_latency": self.prefill_latency, - "prefill_throughput": len(self.input_token_id) - / self.prefill_latency - * 1000 if self.prefill_latency > 0 else 0, + "prefill_throughput": round(len(self.input_token_id) / self.prefill_latency * 1000, 4) if self.prefill_latency > 0 else 0, "decode_token_latencies": self.decode_cost[:], "last_decode_latency": self.decode_cost[-1] if self.decode_cost else 0.0, "decode_max_token_latency": ( @@ -76,13 +79,11 @@ class MiddleData: "seq_latency": self.req_latency, "input_tokens_len": self.num_input_tokens, "generate_tokens_len": self.num_generated_tokens, - "generate_tokens_speed": self.num_generated_tokens - / self.req_latency - * 1000 if self.req_latency > 0 else 0, + "generate_tokens_speed": round(self.num_generated_tokens / self.req_latency * 1000, 4) if self.req_latency > 0 else 0, "input_characters_len": len(self.input_data), "generate_characters_len": self.num_generated_chars, "characters_per_token": ( - self.num_generated_chars / self.num_generated_tokens + round(self.num_generated_chars / self.num_generated_tokens, 4) if self.num_generated_tokens else 0.0 ), diff --git a/ais_bench/benchmark/utils/summarize_plot.py b/ais_bench/benchmark/utils/summarize_plot.py index 28f18a7..9a38411 100644 --- a/ais_bench/benchmark/utils/summarize_plot.py +++ b/ais_bench/benchmark/utils/summarize_plot.py @@ -29,9 +29,9 @@ TIMELINE_POINTS_PER_REQUEST = 3 # 每个请求在时间线图中占3个点( # ================== 辅助函数 ================== def validate_input_data( - start_time_list: List[float], + start_time_list: List[float], prefill_latency_list: List[float], - end_time_list: List[float], + end_time_list: List[float], decode_token_latencies_list: List[List[float]], ) -> bool: """验证输入数据是否合法""" @@ -40,15 +40,15 @@ def validate_input_data( if n_requests == 0: logger.warning("No requests to plot!") return False - - if (n_requests != len(prefill_latency_list) or + + if (n_requests != len(prefill_latency_list) or n_requests != len(end_time_list) or n_requests != len(decode_token_latencies_list)): logger.warning("Input list lengths mismatch! Details: ") logger.warning(f"start_list:{n_requests}, prefill_latency_list:{len(prefill_latency_list)}") logger.warning(f"end_list:{len(end_time_list)}, decode_token_latencies_list:{len(decode_token_latencies_list)}") return False - + return True def is_non_streaming_scenario( @@ -59,9 +59,9 @@ def is_non_streaming_scenario( return all(p == 0.0 for p in prefill_latency_list) def preprocess_data( - start_time_list: List[float], + start_time_list: List[float], prefill_latency_list: List[float], - end_time_list: List[float], + end_time_list: List[float], decode_token_latencies_list: List[List[float]], ) -> Tuple[Optional[np.ndarray], np.ndarray, np.ndarray, bool]: """ @@ -71,13 +71,13 @@ def preprocess_data( start = np.asarray(start_time_list, dtype=np.float64) prefill = np.asarray(prefill_latency_list, dtype=np.float64) / 1000 # prefill数据单位为ms,而其他数据均为s end = np.asarray(end_time_list, dtype=np.float64) - + # 检测是否是非流式场景 is_non_streaming = is_non_streaming_scenario(prefill_latency_list, decode_token_latencies_list) - + # 计算首token时间 first_token_times = (start + prefill) if not is_non_streaming else None - + # 对每条请求是否含有非首token时延判断请求索引对应的end_time是否需要更新, # 因为end_time_list因为打点位置会有误差,需用first_token_time_list的值修正 # 仅在非流式场景修正结束时间 @@ -87,15 +87,15 @@ def preprocess_data( end[no_decode_indices] = first_token_times[no_decode_indices] get_logger().debug(f"Adjusted {len(no_decode_indices)} requests with no decode tokens") del no_decode_indices - + # 计算全局最小时间 global_x_min = np.min(start) if len(start) > 0 else 0.0 - + # 计算相对时间 adjusted_starts = start - global_x_min adjusted_first_tokens = (first_token_times - global_x_min) if not is_non_streaming else None adjusted_ends = end - global_x_min - + return adjusted_first_tokens, adjusted_starts, adjusted_ends, is_non_streaming def generate_timeline_traces( @@ -108,29 +108,29 @@ def generate_timeline_traces( n_requests = len(adjusted_starts) if n_requests == 0: return [] - + # 预分配内存 red_x = np.full(TIMELINE_POINTS_PER_REQUEST * n_requests, np.nan, dtype=np.float32) red_y = np.full_like(red_x, np.nan) blue_x = np.full_like(red_x, np.nan) blue_y = np.full_like(red_x, np.nan) hover_text = np.full(TIMELINE_POINTS_PER_REQUEST * n_requests, None, dtype=object) - sorted_indices = np.argsort(adjusted_starts) - + sorted_indices = np.argsort(adjusted_starts) + for sorted_pos, orig_idx in enumerate(sorted_indices): # 获取当前请求的关键时间点 start_t = adjusted_starts[orig_idx] first_token_t = adjusted_first_tokens[orig_idx] end_t = adjusted_ends[orig_idx] - + # 计算数组中的位置 arr_idx = sorted_pos * 3 - + # 红线段(TTFT):从开始到第一个token red_x[arr_idx] = start_t red_x[arr_idx + 1] = first_token_t red_y[arr_idx:arr_idx + 2] = sorted_pos + 1 - + blue_content_data = "NaN" # 蓝线段(Decode):从第一个token到结束 @@ -140,11 +140,11 @@ def generate_timeline_traces( blue_y[arr_idx:arr_idx + 2] = sorted_pos + 1 decode_time = end_t - first_token_t blue_content_data = f"{first_token_t:.2f}→{end_t:.2f}={decode_time:.2f}" - + # 悬停文本,触发点在红线段起点 ttft = first_token_t - start_t e2e = end_t - start_t - + red_content = f"TTFT({unit}): {start_t:.2f}→{first_token_t:.2f}={ttft:.2f}
" blue_content = f"Decode({unit}): {blue_content_data}
" e2e_content = f"E2E({unit}): {start_t:.2f}→{end_t:.2f}={e2e:.2f}" @@ -155,12 +155,12 @@ def generate_timeline_traces( n_points = len(red_x) chunk_size = min(n_points, MAX_POINTS_PER_TRACE) n_chunks = (n_points + chunk_size - 1) // chunk_size - + for i in range(n_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, n_points) chunk = slice(start_idx, end_idx) - + # 红线段 if np.any(~np.isnan(red_x[chunk])): traces.append(go.Scattergl( @@ -173,7 +173,7 @@ def generate_timeline_traces( showlegend=False, connectgaps=False )) - + # 蓝线段 if np.any(~np.isnan(blue_x[chunk])): traces.append(go.Scattergl( @@ -200,22 +200,22 @@ def generate_concurrency_traces( if not np.any(valid_mask): get_logger().warning("No valid requests for concurrency plot!") return [] - + valid_starts = adjusted_starts[valid_mask] valid_ends = adjusted_ends[valid_mask] n_events = len(valid_starts) * 2 - + # 生成事件数组 events = np.empty((n_events, 2), dtype=np.float32) events[:len(valid_starts), 0] = valid_starts events[:len(valid_starts), 1] = 1 # 开始事件 events[len(valid_starts):, 0] = valid_ends events[len(valid_starts):, 1] = -1 # 结束事件 - + # 稳定排序(时间相同则开始事件优先) sort_indices = np.lexsort((events[:, 1], events[:, 0])) events = events[sort_indices] - + # 计算并发数 unique_times, inverse_indices = np.unique(events[:, 0], return_inverse=True) delta_per_time = np.bincount(inverse_indices, weights=events[:, 1]) @@ -223,28 +223,28 @@ def generate_concurrency_traces( conc_times = unique_times conc_counts = cumulative - + # 创建悬停文本 conc_hover_text = [ - f"Time: {t:.4f}{unit}
Concurrency: {c:.0f}" + f"Time: {t:.4f}{unit}
Concurrency: {c:.0f}" for t, c in zip(conc_times, conc_counts) ] - + # 分块渲染 traces = [] n_points = len(conc_times) chunk_size = min(n_points, MAX_POINTS_PER_TRACE) n_chunks = (n_points + chunk_size - 1) // chunk_size - + for i in range(n_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, n_points) - + if i > 0: start_idx = max(0, start_idx - 1) # 确保连续 - + chunk = slice(start_idx, end_idx) - + traces.append(go.Scattergl( x=conc_times[chunk], y=conc_counts[chunk], @@ -257,7 +257,7 @@ def generate_concurrency_traces( showlegend=False, connectgaps=True )) - + # 清理大数组释放内存 del events, sort_indices, unique_times, inverse_indices, delta_per_time, cumulative del conc_times, conc_counts, conc_hover_text @@ -280,14 +280,14 @@ def create_plot_layout( title=f"Relative Time ({unit})", range=[0, max_time], ) - + yaxis_config = dict( **AXIS_CONFIG, rangemode='nonnegative', tickmode='auto', nticks=10, ) - + if has_timeline: # 双图模式 return dict( @@ -299,7 +299,7 @@ def create_plot_layout( ), yaxis1=dict( **yaxis_config, - title="Request Index", + title="Request Index", ), xaxis2=dict( **xaxis_config, @@ -334,24 +334,24 @@ def create_plot_layout( # ================== 对文件外使用的主函数 ================== def plot_sorted_request_timelines( - start_time_list: List[float], + start_time_list: List[float], prefill_latency_list: List[float], - end_time_list: List[float], + end_time_list: List[float], decode_token_latencies_list: List[List[float]], - output_file: str = "timeline.html", + output_file: str = "timeline.html", unit: str = "s" ) -> None: """绘制请求时间线和并发图表""" logger = get_logger() start_timestamp = time.perf_counter() - + # ===== 1. 数据验证和预处理 ===== logger.info("Starting request timeline processing...") - + # 验证输入数据 if not validate_input_data(start_time_list, prefill_latency_list, end_time_list, decode_token_latencies_list): return False - + # 数据预处理 preprocess_start = time.perf_counter() adjusted_first_token_times, adjusted_starts, adjusted_ends, is_non_streaming = preprocess_data( @@ -360,13 +360,13 @@ def plot_sorted_request_timelines( if is_non_streaming: logger.warning("[Non-streaming scenario] The plot will only show the request concurrency chart!") - + n_requests = len(start_time_list) has_timeline = not is_non_streaming and adjusted_first_token_times is not None and n_requests > 0 max_time = np.max(adjusted_ends) if n_requests > 0 else 1.0 - + logger.info(f"Data preprocessing completed in {time.perf_counter() - preprocess_start:.4f}s") - + # ===== 2. 生成时间线图轨迹(仅流式场景下) ===== timeline_traces = [] if has_timeline: @@ -376,26 +376,26 @@ def plot_sorted_request_timelines( adjusted_starts, adjusted_ends, adjusted_first_token_times, unit ) logger.info(f"Generated timeline trace chunks in {time.perf_counter() - timeline_start:.4f}s") - + # ===== 3. 生成并发图轨迹 ===== logger.info("Generating concurrency traces...") concurrency_start = time.perf_counter() concurrency_traces = generate_concurrency_traces(adjusted_starts, adjusted_ends, unit) - + logger.info(f"Generated concurrency trace chunks in {time.perf_counter() - concurrency_start:.4f}s") - + # ===== 4. 创建图表 ===== logger.info("Creating figure layout...") figure_start = time.perf_counter() - + # 创建布局配置 layout = create_plot_layout(max_time, unit, has_timeline) - + # 创建图表对象 if has_timeline: fig = make_subplots( - rows=2, - cols=1, + rows=2, + cols=1, vertical_spacing=0.1, shared_xaxes=True ) @@ -407,16 +407,16 @@ def plot_sorted_request_timelines( fig = go.Figure() for trace in concurrency_traces: fig.add_trace(trace) - + # 应用布局配置 fig.update_layout(layout) - + logger.info(f"Figure layout created in {time.perf_counter() - figure_start:.4f}s") - + # ===== 5. 输出HTML ===== logger.info(f"Writing to {output_file}...") write_start = time.perf_counter() - + fig.write_html( output_file, include_plotlyjs='cdn', @@ -424,7 +424,7 @@ def plot_sorted_request_timelines( auto_open=False, full_html=True, ) - + logger.info(f"HTML written in {time.perf_counter() - write_start:.4f}s") total_time = time.perf_counter() - start_timestamp logger.info(f"Completed! Total execution time: {total_time:.4f}s") -- Gitee From a486cb92c7e3e96093c06a5ce813167b72149bd0 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Mon, 14 Jul 2025 10:30:12 +0800 Subject: [PATCH 04/10] add requirement orjson --- requirements/runtime.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/runtime.txt b/requirements/runtime.txt index 5bc15c1..3d7af85 100644 --- a/requirements/runtime.txt +++ b/requirements/runtime.txt @@ -19,6 +19,7 @@ nltk>=3.7 numpy>=1.23.4,<2.0.0 openai opencv-python-headless +orjson pandas<2.0.0 plotly prettytable -- Gitee From f84171505db3bee95fe698503cad4b7ec5fdb189 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Mon, 14 Jul 2025 16:46:49 +0800 Subject: [PATCH 05/10] abbr legal char contain digit --- ais_bench/benchmark/utils/build.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ais_bench/benchmark/utils/build.py b/ais_bench/benchmark/utils/build.py index d3f99a3..1de42b3 100644 --- a/ais_bench/benchmark/utils/build.py +++ b/ais_bench/benchmark/utils/build.py @@ -17,7 +17,7 @@ def validate_model_cfg(model_cfg: dict) -> dict: validators = { "attr": lambda v: (v in ("local", "service"), "attr must be 'local' or 'service'"), - "abbr": lambda v: (isinstance(v, str) and re.fullmatch(r'[A-Za-z\-]+', v), + "abbr": lambda v: (isinstance(v, str) and re.fullmatch(r'[A-Za-z0-9\-]+', v), "abbr must contain only letters and hyphens (e.g., 'vllm-api-general-chat')"), "path": lambda v: (not v or (isinstance(v, str) and os.path.exists(v)), f"path is not accessible or does not exist: {v}"), @@ -56,7 +56,7 @@ def validate_model_cfg(model_cfg: dict) -> dict: check(valid, key, msg) return errors - + def build_dataset_from_cfg(dataset_cfg: ConfigDict): dataset_cfg = copy.deepcopy(dataset_cfg) -- Gitee From a23311a9d3ac05cbf2676ffad5d394d0b4e2ab27 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Mon, 14 Jul 2025 17:06:34 +0800 Subject: [PATCH 06/10] abbr legal char contain digit --- ais_bench/benchmark/utils/build.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ais_bench/benchmark/utils/build.py b/ais_bench/benchmark/utils/build.py index 1de42b3..4ecf214 100644 --- a/ais_bench/benchmark/utils/build.py +++ b/ais_bench/benchmark/utils/build.py @@ -17,7 +17,7 @@ def validate_model_cfg(model_cfg: dict) -> dict: validators = { "attr": lambda v: (v in ("local", "service"), "attr must be 'local' or 'service'"), - "abbr": lambda v: (isinstance(v, str) and re.fullmatch(r'[A-Za-z0-9\-]+', v), + "abbr": lambda v: (isinstance(v, str) and re.fullmatch(r'[_A-Za-z0-9\-]+', v), "abbr must contain only letters and hyphens (e.g., 'vllm-api-general-chat')"), "path": lambda v: (not v or (isinstance(v, str) and os.path.exists(v)), f"path is not accessible or does not exist: {v}"), -- Gitee From 3e9a922e84dba111571f29f9ba45ec519cc30f78 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Tue, 15 Jul 2025 10:10:08 +0800 Subject: [PATCH 07/10] rm unused model backend --- ais_bench/benchmark/models/__init__.py | 1 - ais_bench/benchmark/models/mindie_llm_api.py | 321 ------------------- 2 files changed, 322 deletions(-) delete mode 100644 ais_bench/benchmark/models/mindie_llm_api.py diff --git a/ais_bench/benchmark/models/__init__.py b/ais_bench/benchmark/models/__init__.py index 1bc06d5..19ec781 100644 --- a/ais_bench/benchmark/models/__init__.py +++ b/ais_bench/benchmark/models/__init__.py @@ -3,7 +3,6 @@ from ais_bench.benchmark.models.base_api import APITemplateParser, BaseAPIModel from ais_bench.benchmark.models.vllm_custom_api import VLLMCustomAPI, VLLMCustomAPIOld, VLLMCustomAPIStream # noqa: F401 from ais_bench.benchmark.models.vllm_custom_api_chat import VLLMCustomAPIChat, VLLMCustomAPIChatStream # noqa: F401 from ais_bench.benchmark.models.mindie_stream_api import MindieStreamApi -from ais_bench.benchmark.models.mindie_llm_api import MindieLLMModel from ais_bench.benchmark.models.huggingface import HuggingFace, HuggingFaceCausalLM from ais_bench.benchmark.models.huggingface_above_v4_33 import HuggingFaceBaseModel, HuggingFacewithChatTemplate from ais_bench.benchmark.models.tgi_api import TGICustomAPI, TGICustomAPIStream diff --git a/ais_bench/benchmark/models/mindie_llm_api.py b/ais_bench/benchmark/models/mindie_llm_api.py deleted file mode 100644 index 0e496b6..0000000 --- a/ais_bench/benchmark/models/mindie_llm_api.py +++ /dev/null @@ -1,321 +0,0 @@ -import os -import sys -import csv -import json - -from typing import Dict, List, Optional, Union - -import numpy as np -import torch -import transformers - -from ais_bench.benchmark.models.base import BaseModel -from ais_bench.benchmark.models.performance import PerformanceModel -from ais_bench.benchmark.models.base_api import APITemplateParser -from ais_bench.benchmark.registry import MODELS -from ais_bench.benchmark.utils.logging import get_logger -from ais_bench.benchmark.utils.prompt import PromptList - - -DTYPE_MAP = {"bf16": torch.bfloat16, "fp16": torch.float16} - - -@MODELS.register_module() -class MindieLLMModel(PerformanceModel): - """ - Model wrapper around MindIE-LLM models. - """ - - def __init__(self, - environ_kwargs: Optional[Dict] = None, - **kwargs): - super().__init__(path=kwargs.get('weight_dir'), - max_seq_len=kwargs.get('output_length'), - tokenizer_only=False, - meta_template=None) - for key, value in environ_kwargs.items(): - os.environ[key] = value - - self.rank = int(os.getenv("RANK", "0")) - self.local_rank = int(os.getenv("LOCAL_RANK", "0")) - self.world_size = kwargs.get('world_size') - self.block_size = kwargs.get('block_size') - - self.model_name = kwargs.get('model_name') - self.data_type = kwargs.get('data_type') # fp16 / bf16 - self.weight_dir = kwargs.get('weight_dir') - self.max_position_embedding = kwargs.get('max_position_embedding') - self.is_chat_model = kwargs.get('is_chat_model') - self.prefill_batch_size = kwargs.get('prefill_batch_size') - self.kw_args = kwargs.get('kw_args') - self.dp = kwargs.get('dp') - self.tp = kwargs.get('tp') - self.sp = kwargs.get('sp') - self.moe_tp = kwargs.get('moe_tp') - self.pp = kwargs.get('pp') - self.microbatch_size = kwargs.get('microbatch_size') - self.moe_ep = kwargs.get('moe_ep') - self.trust_remote_code = kwargs.get('trust_remote_code') - self.ignore_eos = kwargs.get('ignore_eos') - self.input_length = kwargs.get('input_length') - self.output_length = kwargs.get('output_length') - self.decode_batch_size = kwargs.get('decode_batch_size') - self.input_token_len = kwargs.get('input_token_len', None) - self.logger = get_logger() - self.pa_runner = None - self.rank_table_file = kwargs.get('rank_table_file') - if self.rank_table_file: - os.environ['RANKTABLEFILE'] = self.rank_table_file - try: - os.environ['WORLD_SIZE'] = str(self.world_size) - except Exception as e: - raise TypeError("world_size invalid") from e - - self.batch_latencies = [] - self.pa_runner_perf_file_path = None - - self.get_model_or_runner(self.input_length, self.output_length) - self.check_pa_runner() - self.warm_up() - - def set_performance(self): - self.do_performance = True - os.environ["ATB_LLM_BENCHMARK_ENABLE"] = "1" - cur_dir = os.path.dirname(os.path.abspath(__file__)) - self.pa_runner_perf_file_path = os.path.join(cur_dir, "../../../benchmark.csv") - os.environ["ATB_LLM_BENCHMARK_FILEPATH"] = self.pa_runner_perf_file_path - self.ignore_eos = True # out len equal to max_out_len - self.detail_perf_datas = [] - - def check_pa_runner(self): - if self.pa_runner == None: - raise RuntimeError("Model loading failed") - - def warm_up(self): - self.pa_runner.warm_up() - - def merge_perf_datas(self): - ms = " ms" - unit_token = " token/s" - total_req = len(self.detail_perf_datas) - e2el = sum(self.batch_latencies) - if total_req <= 0 or e2el <= 0: - self.logger.warning("No performance data to merge, please check") - return {} - common_metric_units_map = { - "Benchmark Duration": ms, - "Total Requests": None, - "Request Throughput": " req/s", - "Total Input Tokens": None, - "Prefill Token Throughput": "", - "Input Token Throughput": unit_token, - "Total Output Tokens": None, - "Output Token Throughput": unit_token, - "Total Token Throughput": unit_token, - } - perf_key = "total" - merge_res = { - "Benchmark Duration": {perf_key: e2el * 1000}, - "Total Requests": {perf_key: total_req}, - "Request Throughput": {perf_key: total_req / e2el}, - "Total Input Tokens": { - perf_key: sum(data["seq_len_in"] for data in self.detail_perf_datas) - }, - "Prefill Token Throughput": { - perf_key: sum(data["seq_len_in"] for data in self.detail_perf_datas) - / sum(data["first_token_time"] for data in self.detail_perf_datas) - }, - "Input Token Throughput": { - perf_key: sum(data["seq_len_in"] for data in self.detail_perf_datas) / e2el - }, - "Total Output Tokens": { - perf_key: sum(data["seq_len_out"] for data in self.detail_perf_datas) - }, - "Output Token Throughput": { - perf_key: sum(data["seq_len_out"] for data in self.detail_perf_datas) / e2el - }, - "Total Token Throughput": { - perf_key: ( - sum( - data["seq_len_in"] + data["seq_len_out"] - for data in self.detail_perf_datas - ) - / e2el - ) - }, - } - for key,value in merge_res.items(): - value[perf_key] = str(round(value[perf_key], 4)) - if common_metric_units_map[key]: - value[perf_key] += common_metric_units_map[key] - return merge_res - - def handle_perf_result(self, output_filepath, output_filename): - e2e_latency = sum(self.batch_latencies) - if self.pa_runner_perf_file_path is not None and self.input_token_len is not None and self.rank == 0: # get pa runner special performance data - if not os.path.exists(output_filepath): - os.makedirs(output_filepath, mode=0o750) - json_path = os.path.join(output_filepath, f"pa_runner_special_perf_data_{output_filename}.json") - with open(json_path, "w") as file: - json.dump(self.detail_perf_datas, file, ensure_ascii=False, indent=4) - - self.logger.info(f"PARUNNER special performance datas saved in {json_path}") - return self.merge_perf_datas() - return {"Benchmark Duration":{"total":str(round(e2e_latency, 4)) + ' ms'}} - - def get_model_or_runner(self, input_length, output_length, warmup_bs=0): - - try: - ATB_SPEED_HOME_PATH = os.environ.get("ATB_SPEED_HOME_PATH") - if ATB_SPEED_HOME_PATH not in sys.path: - sys.path.insert(0, os.path.join(ATB_SPEED_HOME_PATH, "../..")) - sys.path.insert(0, ATB_SPEED_HOME_PATH) - from atb_llm.utils.env import ENV - from examples.run_pa import PARunner - except Exception: - raise RuntimeError("Failed to import necessary packages") - - rank = "rank" - world_size = "world_size" - local_rank = "local_rank" - model_path = "model_path" - max_position_embeddings = "max_position_embeddings" - max_input_length = "max_input_length" - max_output_length = "max_output_length" - trust_remote_code = "trust_remote_code" - - - prefill_batch_size = self.decode_batch_size if self.prefill_batch_size == 0 else self.prefill_batch_size - - input_dict = { - rank: self.rank, - local_rank: self.local_rank, - world_size: self.world_size, - 'max_prefill_tokens': -1, - 'block_size': self.block_size, - model_path: self.weight_dir, - max_position_embeddings: (self.max_position_embedding - if self.max_position_embedding != -1 - else input_length + output_length), - 'max_prefill_batch_size': prefill_batch_size, - 'max_batch_size': warmup_bs if warmup_bs != 0 else self.decode_batch_size, - max_input_length: input_length, - max_output_length: output_length, - 'kw_args': self.kw_args, - 'dp': self.dp, - 'tp': self.tp, - 'sp': self.sp, - 'moe_tp': self.moe_tp, - 'pp': self.pp, - 'microbatch_size': self.microbatch_size, - 'moe_ep': self.moe_ep, - trust_remote_code: self.trust_remote_code - } - if self.model_name == "qwen2_72b" or self.model_name == "qwen2_7b": - input_dict[max_position_embeddings] = None - self.pa_runner = PARunner(**input_dict) - model_dtype = self.pa_runner.model.dtype - self.tokenizer = self.pa_runner.model.tokenizer - user_dtype = DTYPE_MAP.get(self.data_type, None) - if user_dtype != model_dtype: - self.logger.error( - "Inconsistent dtype: Input dtype: %s, model weight dtype: %s. please check", - user_dtype, model_dtype) - raise RuntimeError( - f"Inconsistent dtype: Input dtype: {user_dtype}, " + - f"model weight dtype: {model_dtype}. please check") - - self.logger.info('%d pa_runner: %s', self.rank, self.pa_runner) - - - def generate(self, - inputs: List[str], - max_out_len: int, - **kwargs) -> List[str]: - """Generate results given a list of inputs. - - Args: - inputs (List[str]): A list of strings. - max_out_len (int): The maximum length of the output. - - Returns: - List[str]: A list of generated strings. - """ - if self.do_performance and self.input_token_len is not None: # enable token_input - inputs = self._trans_to_input_ids(inputs) - inputs = [self._padding_input_ids(input_ids) for input_ids in inputs] - - generate_texts, _, e2e_latency_per_bs = self.pa_runner.infer(inputs, - len(inputs), - max_out_len, - self.ignore_eos, - self.is_chat_model) - - if hasattr(self, "do_performance") and self.do_performance: - self.batch_latencies.append(e2e_latency_per_bs) - if self.pa_runner_perf_file_path is not None and self.input_token_len is not None and self.rank == 0: # get pa runner special performance data - with open(self.pa_runner_perf_file_path, mode='r', encoding='utf-8') as file: - csv_reader = csv.reader(file) - next(csv_reader) - second_row = next(csv_reader) - first_token_time = float(second_row[4]) / 1000 - non_first_token_time = float(second_row[5]) / 1000 - try: - non_first_token_throughput = len(inputs) / non_first_token_time - except ZeroDivisionError: - non_first_token_throughput = 0 - e2e_throughput = len(inputs) * max_out_len / e2e_latency_per_bs - - self.logger.info( - "seq_len_in: %d, seq_len_out: %d, total_time(s): %f," - "first_token_time(ms): %f," - "non_first_token_time(ms): %f," - "non_first_token_throughput(1/s): %f," - "e2e_time(s): %f, e2e_throughput(tokens/s): %f", - self.input_token_len, max_out_len, e2e_latency_per_bs, - first_token_time * 1000, - non_first_token_time * 1000, - non_first_token_throughput, - e2e_latency_per_bs, e2e_throughput - ) - - self.detail_perf_datas.append( - dict( - batch_size = len(inputs), - seq_len_in = self.input_token_len, - seq_len_out = max_out_len, - total_time = e2e_latency_per_bs, - first_token_time = first_token_time * 1000, - non_first_token_time = non_first_token_time * 1000, - e2e_time = e2e_latency_per_bs, - e2e_throughput = e2e_throughput - ) - ) - return None - else: - return generate_texts - - def _trans_to_input_ids(self, inputs: List[str]): - input_ids_list = [] - for input in inputs: - input_ids_list.append(self.tokenizer.encode(input, add_special_tokens=False)) - return input_ids_list - - def _padding_input_ids(self, input_ids: list): - if len(input_ids) == 0: - raise RuntimeError("Input for model infer is empty, please check") - while (len(input_ids) < self.input_token_len): - input_ids = input_ids * 2 - return input_ids[:self.input_token_len] - - def get_token_len(self, prompt: str) -> int: - """Get lengths of the tokenized strings. - - Args: - prompt (str): Input string. - - Returns: - int: Length of the input tokens - """ - return len(self.tokenizer.encode(prompt)) - -- Gitee From 4875632126272e828b5081ce6fe4bffe0fff1abb Mon Sep 17 00:00:00 2001 From: yh_silence Date: Tue, 15 Jul 2025 17:22:57 +0800 Subject: [PATCH 08/10] human-eval requirement --- requirements/extra.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/extra.txt b/requirements/extra.txt index ba16086..d9dec69 100644 --- a/requirements/extra.txt +++ b/requirements/extra.txt @@ -1,5 +1,5 @@ # Humaneval, Humaneval X --e git+https://github.com/openai/human-eval.git#egg=human-eval +human-eval # math500 math-verify==0.5.2 -- Gitee From 6cf50c630c52be1090a1166d733f24bdc0234fa5 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Wed, 23 Jul 2025 10:56:25 +0800 Subject: [PATCH 09/10] stable stage calculate fix --- .../stable_perf_metric_calculator.py | 52 +++++++++---------- doc/users_guide/stable_stage.md | 2 +- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py index fe04c53..8dbd09f 100644 --- a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py +++ b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py @@ -1,5 +1,4 @@ import csv -import heapq from tqdm import tqdm import collections import math @@ -41,45 +40,42 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): for stage_name, _ in self.stage_dict.items(): self._process_result(perf_details.get("requests"), stage_name) - def _get_requests_id(self, perf_details): + time_point_concurrency = [0] * 2 * len(perf_details["requests"]["id"]) request_time_sections = [] for id in range(len(perf_details["requests"]["id"])): request_time_sections.append({ "id": id, - "start_time": perf_details["requests"]["start_time"][id], - "end_time": perf_details["requests"]["end_time"][id], + "attr": "start", + "time": perf_details["requests"]["start_time"][id], }) - - sorted_time_sections = sorted(request_time_sections, key=lambda x: x["start_time"]) + request_time_sections.append({ + "id": id, + "attr": "end", + "time": perf_details["requests"]["end_time"][id], + }) + sorted_time_sections = sorted(request_time_sections, key=lambda x: x["time"]) id_lists = [] - working_reqs = {} - self.logger.info("Calculating stable stage ...") + self.logger.info("Start calculating stable stage ...") for i, section in enumerate(tqdm(sorted_time_sections)): - poped_ids = [] - for k in list(working_reqs.keys()): - if working_reqs[k][1] < section["start_time"]: - poped_ids.append(k) - working_reqs.pop(k, None) - working_reqs[section["id"]] = [section["start_time"], section["end_time"]] - if len(working_reqs) == self.max_concurrency: + if section["attr"] == "start": + time_point_concurrency[i] = time_point_concurrency[i - 1] + 1 + else: + time_point_concurrency[i] = time_point_concurrency[i - 1] - 1 + if section["attr"] == "start" and time_point_concurrency[i] == self.max_concurrency: id_lists.append(section["id"]) - if len(id_lists) == 1: - self.stage_section[0] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total start time - elif len(working_reqs) >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 0: + if len(id_lists) == 2: + self.stage_section[0] = section["time"] # total start time + elif section["attr"] == "start" and time_point_concurrency[i] >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 2: id_lists.append(section["id"]) - else: - if len(id_lists) > 0: # start to leave stable - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in poped_ids]) - break - + elif len(id_lists) > 1 and section["attr"] == "end" and time_point_concurrency[i] < int(self.max_concurrency * (1 - WAVE_OFFSET)): + self.stage_section[1] = section["time"] + break if len(id_lists) > 0: id_lists.pop(0) # ignore first request that reached max concurrency if len(id_lists) == 0: raise RuntimeError("Can not find a stable stage!") - - if self.stage_section[1] == 0: - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total end time + self.logger.info("Finish calculating stable stage.") return id_lists def _get_legal_stats_list(self, stats_list): @@ -301,9 +297,9 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): if self.common_metrics["Failed Requests"][stage_name] > 0: self.logger.warning("Some requests failed, please check the ERROR log from responses!") self.common_metrics["Success Requests"][stage_name] = self.success_count[stage_name] - self.common_metrics["Concurrency"][stage_name] = round( + self.common_metrics["Concurrency"][stage_name] = min(round( sum(self.result[stage_name]["E2EL"]) / self.infer_time[stage_name] / 1000, 4 - ) + ), self.max_concurrency) self.common_metrics["Max Concurrency"][stage_name] = self.max_concurrency try: diff --git a/doc/users_guide/stable_stage.md b/doc/users_guide/stable_stage.md index 3136471..6aca8ad 100644 --- a/doc/users_guide/stable_stage.md +++ b/doc/users_guide/stable_stage.md @@ -24,7 +24,7 @@ graph LR; - **流量爬坡阶段:** 与推理服务建立连接的客户端数量在不断增加,服务同时处理的请求数也同步增加。 - **实际稳态阶段:** 推理服务同时在处理的请求数量达到最大并发数。 -- **计算稳态阶段:** 推理服务同时在处理的请求数量首次达到最大并发数后的第一条请求发送的时间点(t2)至推理服务同时在处理的请求数量最后处于最大并发数(t4)的阶段。工具将开始时间处于这个阶段的所有请求都视为稳定阶段的请求。
性能指标中的Benchmark Duration指的就是这个阶段的时延。
**注意**,由于Benchmark Duration会用于计算吞吐率,计算出的吞吐率会存在误差,误差是由于t0至t2之间未算入稳态的请求和t4至t5中被纳入稳态阶段的请求占用的计算资源差异导致的。只有当整个测试过程的最大的单请求时延E2EL(End-to-End-Latency)小于Benchmark Duration的1/3时,计算出的吞吐数据置信度才足够。 +- **计算稳态阶段:** 推理服务同时在处理的请求数量首次达到最大并发数后的第一条请求发送的时间点(t2)至推理服务同时在处理的请求数量最后处于最大并发数(t4)的阶段。工具将开始时间处于这个阶段的所有请求都视为稳定阶段的请求。
性能指标中的Benchmark Duration指的就是这个阶段的时延。
**注意**,由于Benchmark Duration会用于计算吞吐率,计算出的吞吐率会存在误差,误差是由于t0至t2之间未算入稳态的请求和t4至t5中被纳入稳态阶段的请求占用的计算资源差异导致的。同时,此差异可能会导致性能指标中的Concurrency的计算结果超过Max Concurrency,但是Concurrency依然显示为Max Concurrency的取值。只有当整个测试过程的最大的单请求时延E2EL(End-to-End-Latency)小于Benchmark Duration的1/3时,计算出的吞吐数据置信度才足够。 - **发送请求阶段:** 此阶段工具在不断给推理服务发送请求,此阶段后工具会等待所有请求返回。 - **流量退出阶段:** 推理服务同时处理的请求数量在不断降低,直到最终所有请求都返回。 -- Gitee From 36a2672fbd47513497da662794aea478b37c1619 Mon Sep 17 00:00:00 2001 From: yh_silence Date: Wed, 23 Jul 2025 15:21:59 +0800 Subject: [PATCH 10/10] concurrency fix --- .../benchmark/calculators/stable_perf_metric_calculator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py index 8dbd09f..721990b 100644 --- a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py +++ b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py @@ -57,9 +57,11 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): sorted_time_sections = sorted(request_time_sections, key=lambda x: x["time"]) id_lists = [] self.logger.info("Start calculating stable stage ...") + requested = 0 for i, section in enumerate(tqdm(sorted_time_sections)): if section["attr"] == "start": time_point_concurrency[i] = time_point_concurrency[i - 1] + 1 + requested += 1 else: time_point_concurrency[i] = time_point_concurrency[i - 1] - 1 if section["attr"] == "start" and time_point_concurrency[i] == self.max_concurrency: @@ -68,6 +70,9 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): self.stage_section[0] = section["time"] # total start time elif section["attr"] == "start" and time_point_concurrency[i] >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 2: id_lists.append(section["id"]) + elif requested == len(perf_details["requests"]["id"]) and section["attr"] == "end": + self.stage_section[1] = section["time"] + break elif len(id_lists) > 1 and section["attr"] == "end" and time_point_concurrency[i] < int(self.max_concurrency * (1 - WAVE_OFFSET)): self.stage_section[1] = section["time"] break -- Gitee