diff --git a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py index fe04c53a3ba36ee2c5853c33ebbc30fbe4764066..721990b3f7fe908b9eb85b7b1222f0bc02d50ac4 100644 --- a/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py +++ b/ais_bench/benchmark/calculators/stable_perf_metric_calculator.py @@ -1,5 +1,4 @@ import csv -import heapq from tqdm import tqdm import collections import math @@ -41,45 +40,47 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): for stage_name, _ in self.stage_dict.items(): self._process_result(perf_details.get("requests"), stage_name) - def _get_requests_id(self, perf_details): + time_point_concurrency = [0] * 2 * len(perf_details["requests"]["id"]) request_time_sections = [] for id in range(len(perf_details["requests"]["id"])): request_time_sections.append({ "id": id, - "start_time": perf_details["requests"]["start_time"][id], - "end_time": perf_details["requests"]["end_time"][id], + "attr": "start", + "time": perf_details["requests"]["start_time"][id], }) - - sorted_time_sections = sorted(request_time_sections, key=lambda x: x["start_time"]) + request_time_sections.append({ + "id": id, + "attr": "end", + "time": perf_details["requests"]["end_time"][id], + }) + sorted_time_sections = sorted(request_time_sections, key=lambda x: x["time"]) id_lists = [] - working_reqs = {} - self.logger.info("Calculating stable stage ...") + self.logger.info("Start calculating stable stage ...") + requested = 0 for i, section in enumerate(tqdm(sorted_time_sections)): - poped_ids = [] - for k in list(working_reqs.keys()): - if working_reqs[k][1] < section["start_time"]: - poped_ids.append(k) - working_reqs.pop(k, None) - working_reqs[section["id"]] = [section["start_time"], section["end_time"]] - if len(working_reqs) == self.max_concurrency: + if section["attr"] == "start": + time_point_concurrency[i] = time_point_concurrency[i - 1] + 1 + requested += 1 + else: + time_point_concurrency[i] = time_point_concurrency[i - 1] - 1 + if section["attr"] == "start" and time_point_concurrency[i] == self.max_concurrency: id_lists.append(section["id"]) - if len(id_lists) == 1: - self.stage_section[0] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total start time - elif len(working_reqs) >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 0: + if len(id_lists) == 2: + self.stage_section[0] = section["time"] # total start time + elif section["attr"] == "start" and time_point_concurrency[i] >= int(self.max_concurrency * (1 - WAVE_OFFSET)) and len(id_lists) > 2: id_lists.append(section["id"]) - else: - if len(id_lists) > 0: # start to leave stable - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in poped_ids]) - break - + elif requested == len(perf_details["requests"]["id"]) and section["attr"] == "end": + self.stage_section[1] = section["time"] + break + elif len(id_lists) > 1 and section["attr"] == "end" and time_point_concurrency[i] < int(self.max_concurrency * (1 - WAVE_OFFSET)): + self.stage_section[1] = section["time"] + break if len(id_lists) > 0: id_lists.pop(0) # ignore first request that reached max concurrency if len(id_lists) == 0: raise RuntimeError("Can not find a stable stage!") - - if self.stage_section[1] == 0: - self.stage_section[1] = min([perf_details["requests"]["end_time"][id] for id in list(working_reqs.keys())]) # total end time + self.logger.info("Finish calculating stable stage.") return id_lists def _get_legal_stats_list(self, stats_list): @@ -301,9 +302,9 @@ class StablePerfMetricCalculator(BasePerfMetricCalculator): if self.common_metrics["Failed Requests"][stage_name] > 0: self.logger.warning("Some requests failed, please check the ERROR log from responses!") self.common_metrics["Success Requests"][stage_name] = self.success_count[stage_name] - self.common_metrics["Concurrency"][stage_name] = round( + self.common_metrics["Concurrency"][stage_name] = min(round( sum(self.result[stage_name]["E2EL"]) / self.infer_time[stage_name] / 1000, 4 - ) + ), self.max_concurrency) self.common_metrics["Max Concurrency"][stage_name] = self.max_concurrency try: diff --git a/doc/users_guide/stable_stage.md b/doc/users_guide/stable_stage.md index 3136471219eb63949025f9ff7f2285f06b3ff318..6aca8adc69890b7042082635a4ec7fce5260f977 100644 --- a/doc/users_guide/stable_stage.md +++ b/doc/users_guide/stable_stage.md @@ -24,7 +24,7 @@ graph LR; - **流量爬坡阶段:** 与推理服务建立连接的客户端数量在不断增加,服务同时处理的请求数也同步增加。 - **实际稳态阶段:** 推理服务同时在处理的请求数量达到最大并发数。 -- **计算稳态阶段:** 推理服务同时在处理的请求数量首次达到最大并发数后的第一条请求发送的时间点(t2)至推理服务同时在处理的请求数量最后处于最大并发数(t4)的阶段。工具将开始时间处于这个阶段的所有请求都视为稳定阶段的请求。
性能指标中的Benchmark Duration指的就是这个阶段的时延。
**注意**,由于Benchmark Duration会用于计算吞吐率,计算出的吞吐率会存在误差,误差是由于t0至t2之间未算入稳态的请求和t4至t5中被纳入稳态阶段的请求占用的计算资源差异导致的。只有当整个测试过程的最大的单请求时延E2EL(End-to-End-Latency)小于Benchmark Duration的1/3时,计算出的吞吐数据置信度才足够。 +- **计算稳态阶段:** 推理服务同时在处理的请求数量首次达到最大并发数后的第一条请求发送的时间点(t2)至推理服务同时在处理的请求数量最后处于最大并发数(t4)的阶段。工具将开始时间处于这个阶段的所有请求都视为稳定阶段的请求。
性能指标中的Benchmark Duration指的就是这个阶段的时延。
**注意**,由于Benchmark Duration会用于计算吞吐率,计算出的吞吐率会存在误差,误差是由于t0至t2之间未算入稳态的请求和t4至t5中被纳入稳态阶段的请求占用的计算资源差异导致的。同时,此差异可能会导致性能指标中的Concurrency的计算结果超过Max Concurrency,但是Concurrency依然显示为Max Concurrency的取值。只有当整个测试过程的最大的单请求时延E2EL(End-to-End-Latency)小于Benchmark Duration的1/3时,计算出的吞吐数据置信度才足够。 - **发送请求阶段:** 此阶段工具在不断给推理服务发送请求,此阶段后工具会等待所有请求返回。 - **流量退出阶段:** 推理服务同时处理的请求数量在不断降低,直到最终所有请求都返回。