diff --git a/config/collector.conf b/config/collector.conf index 56b0ed175cd9db45467adb999039e85fbfde60d9..8913530de067ca9a6d6c8ec4facfcafd4b530b5a 100644 --- a/config/collector.conf +++ b/config/collector.conf @@ -5,6 +5,9 @@ modules=io period_time=1 max_save=10 disk=default +nvme_ssd_threshold=1000 +sata_ssd_threshold=1000 +sata_hdd_threshold=1000 [log] level=info \ No newline at end of file diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini index 53ac486086ca31947cc49f2748612a0dca801641..ca926ace550bafc2967876eb3a0ea9350b62ec1b 100644 --- a/config/plugins/ai_block_io.ini +++ b/config/plugins/ai_block_io.ini @@ -14,7 +14,8 @@ algorithm_type=boxplot boxplot_parameter=1.5 win_type=not_continuous win_size=30 -win_threshold=6 +win_threshold_latency=6 +win_threshold_iodump=3 [latency_sata_ssd] read_avg_lim=10000 diff --git a/config/plugins/avg_block_io.ini b/config/plugins/avg_block_io.ini index 3b4ee330664233bbd9e6340a9cec327855ba9687..52c2ab99284319e62708067b7a14a5a6848050fe 100644 --- a/config/plugins/avg_block_io.ini +++ b/config/plugins/avg_block_io.ini @@ -9,7 +9,8 @@ period_time=1 [algorithm] win_size=30 -win_threshold=6 +win_threshold_latency=6 +win_threshold_iodump=3 [latency_nvme_ssd] read_avg_lim=10000 diff --git a/src/libsentry/python/pySentryCollector/collect_plugin.py b/src/libsentry/python/pySentryCollector/collect_plugin.py index 3395f89d170190a3a43d878e84e836269665211f..e1befe7cb8c44ce2930c052a5e307d0f1c7e30c4 100644 --- a/src/libsentry/python/pySentryCollector/collect_plugin.py +++ b/src/libsentry/python/pySentryCollector/collect_plugin.py @@ -52,6 +52,7 @@ LIMIT_MAX_SAVE_LEN = 300 class ClientProtocol(): IS_IOCOLLECT_VALID = 0 GET_IO_DATA = 1 + GET_IODUMP_DATA = 2 PRO_END = 3 class ResultMessage(): @@ -234,14 +235,8 @@ def inter_is_iocollect_valid(period, disk_list=None, stage=None): result['message'] = result_message return result -def get_io_data(period, disk_list, stage, iotype): - result = inter_get_io_data(period, disk_list, stage, iotype) - error_code = result['ret'] - if error_code != ResultMessage.RESULT_SUCCEED: - result['message'] = Result_Messages[error_code] - return result -def inter_get_io_data(period, disk_list, stage, iotype): +def inter_get_io_common(period, disk_list, stage, iotype, protocol): result = {} result['ret'] = ResultMessage.RESULT_UNKNOWN result['message'] = "" @@ -269,21 +264,21 @@ def inter_get_io_data(period, disk_list, stage, iotype): return result req_msg_struct = { - 'disk_list': json.dumps(disk_list), - 'period': period, - 'stage': json.dumps(stage), - 'iotype' : json.dumps(iotype) - } + 'disk_list': json.dumps(disk_list), + 'period': period, + 'stage': json.dumps(stage), + 'iotype': json.dumps(iotype) + } request_message = json.dumps(req_msg_struct) - result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.GET_IO_DATA) + result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, protocol) if not result_message: logging.error("collect_plugin: client_send_and_recv failed") return result try: json.loads(result_message) except json.JSONDecodeError: - logging.error("get_io_data: json decode error") + logging.error("get_io_common: json decode error") result['ret'] = ResultMessage.RESULT_PARSE_FAILED return result @@ -291,6 +286,23 @@ def inter_get_io_data(period, disk_list, stage, iotype): result['message'] = result_message return result + +def get_io_data(period, disk_list, stage, iotype): + result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_IO_DATA) + error_code = result['ret'] + if error_code != ResultMessage.RESULT_SUCCEED: + result['message'] = Result_Messages[error_code] + return result + + +def get_iodump_data(period, disk_list, stage, iotype): + result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_IODUMP_DATA) + error_code = result['ret'] + if error_code != ResultMessage.RESULT_SUCCEED: + result['message'] = Result_Messages[error_code] + return result + + def get_disk_type(disk): result = {} result['ret'] = ResultMessage.RESULT_UNKNOWN diff --git a/src/sentryPlugins/ai_block_io/ai_block_io.py b/src/sentryPlugins/ai_block_io/ai_block_io.py index f7b822630b177357593e263b260b799435d531b1..2973d5240cf71de787dab511bb0c92520ddef227 100644 --- a/src/sentryPlugins/ai_block_io/ai_block_io.py +++ b/src/sentryPlugins/ai_block_io/ai_block_io.py @@ -14,19 +14,21 @@ import signal import logging from collections import defaultdict -from .detector import Detector, DiskDetector +from .detector import Detector, DiskDetector, DataDetector from .threshold import ThresholdFactory, ThresholdType -from .sliding_window import SlidingWindowFactory +from .sliding_window import SlidingWindowFactory, DataWindow from .utils import get_data_queue_size_and_update_size from .config_parser import ConfigParser from .data_access import ( get_io_data_from_collect_plug, + get_iodump_data_from_collect_plug, check_collect_valid, get_disk_type, check_disk_is_available ) from .io_data import MetricName from .alarm_report import Xalarm, Report +from .extra_logger import extra_slow_log CONFIG_FILE = "/etc/sysSentry/plugins/ai_block_io.ini" @@ -93,6 +95,8 @@ class SlowIODetection: for iotype in iotypes: self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "latency")) self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iops")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iodump_data")) if not self._detector_name_list: Report.report_pass("the disks to detection is empty, ai_block_io will exit.") @@ -109,7 +113,7 @@ class SlowIODetection: train_data_duration, train_update_duration, slow_io_detection_frequency ) sliding_window_type = self._config_parser.sliding_window_type - window_size, window_threshold = ( + window_size, window_threshold_latency, window_threshold_iodump = ( self._config_parser.get_window_size_and_window_minimum_threshold() ) @@ -141,7 +145,7 @@ class SlowIODetection: sliding_window = SlidingWindowFactory().get_sliding_window( sliding_window_type, queue_length=window_size, - threshold=window_threshold, + threshold=window_threshold_latency, abs_threshold=tot_lim, avg_lim=avg_lim ) @@ -159,12 +163,27 @@ class SlowIODetection: sliding_window = SlidingWindowFactory().get_sliding_window( sliding_window_type, queue_length=window_size, - threshold=window_threshold + threshold=window_threshold_iodump ) detector = Detector(metric_name, threshold, sliding_window) threshold.set_threshold(abs_threshold) disk_detector.add_detector(detector) + elif metric_name.metric_name == 'iops': + threshold = ThresholdFactory().get_threshold(ThresholdType.AbsoluteThreshold) + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, + threshold=window_threshold_latency + ) + detector = Detector(metric_name, threshold, sliding_window) + disk_detector.add_detector(detector) + + elif metric_name.metric_name == 'iodump_data': + data_window = DataWindow(window_size) + data_detector = DataDetector(metric_name, data_window) + disk_detector.add_data_detector(data_detector) + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") self._disk_detectors[disk] = disk_detector @@ -176,6 +195,9 @@ class SlowIODetection: io_data_dict_with_disk_name = get_io_data_from_collect_plug( self._config_parser.period_time, self._disk_list ) + iodump_data_dict_with_disk_name = get_iodump_data_from_collect_plug( + self._config_parser.period_time, self._disk_list + ) logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") if io_data_dict_with_disk_name is None: Report.report_pass( @@ -187,9 +209,13 @@ class SlowIODetection: logging.debug("step2. Start to detection slow io event.") slow_io_event_list = [] for disk, disk_detector in self._disk_detectors.items(): + disk_detector.push_data_to_data_detectors(iodump_data_dict_with_disk_name) result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) if result[0]: + # 产生告警时获取iodump的详细数据 + result[6]["iodump_data"] = disk_detector.get_data_detector_list_window() slow_io_event_list.append(result) + logging.debug("step2. End to detection slow io event.") # Step3:慢IO事件上报 @@ -204,17 +230,17 @@ class SlowIODetection: "alarm_type": slow_io_event[5], "details": slow_io_event[6] } - Xalarm.major(alarm_content) - tmp_alarm_content = alarm_content.copy() - del tmp_alarm_content["details"] - logging.warning("[SLOW IO] " + str(tmp_alarm_content)) - logging.warning(f'[SLOW IO] disk: {str(tmp_alarm_content.get("driver_name"))}, ' - f'stage: {str(tmp_alarm_content.get("block_stack"))}, ' - f'iotype: {str(tmp_alarm_content.get("io_type"))}, ' - f'type: {str(tmp_alarm_content.get("alarm_type"))}, ' - f'reason: {str(tmp_alarm_content.get("reason"))}') + logging.warning(f'[SLOW IO] disk: {str(alarm_content.get("driver_name"))}, ' + f'stage: {str(alarm_content.get("block_stack"))}, ' + f'iotype: {str(alarm_content.get("io_type"))}, ' + f'type: {str(alarm_content.get("alarm_type"))}, ' + f'reason: {str(alarm_content.get("reason"))}') logging.warning(f"latency: " + str(alarm_content.get("details").get("latency"))) logging.warning(f"iodump: " + str(alarm_content.get("details").get("iodump"))) + logging.warning(f"iops: " + str(alarm_content.get("details").get("iops"))) + extra_slow_log(alarm_content) + del alarm_content["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 + Xalarm.major(alarm_content) # Step4:等待检测时间 logging.debug("step4. Wait to start next slow io event detection loop.") diff --git a/src/sentryPlugins/ai_block_io/config_parser.py b/src/sentryPlugins/ai_block_io/config_parser.py index 612fe9f914c26a7709c7ca52cf837e6c37f496bf..b457e14438a9409effe40837929fc085629b5355 100644 --- a/src/sentryPlugins/ai_block_io/config_parser.py +++ b/src/sentryPlugins/ai_block_io/config_parser.py @@ -17,9 +17,11 @@ from .alarm_report import Report from .threshold import ThresholdType from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level from .data_access import check_detect_frequency_is_valid +from .extra_logger import init_extra_logger LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" +AI_EXTRA_LOG_PATH = "/var/log/sysSentry/ai_block_io_extra.log" ALL_STAGE_LIST = [ "throtl", @@ -52,6 +54,7 @@ def init_log_format(log_level: str): logging.warning( "the log_level: %s you set is invalid, use default value: info.", log_level ) + init_extra_logger(AI_EXTRA_LOG_PATH, get_log_level(log_level.lower()), LOG_FORMAT) class ConfigParser: @@ -71,7 +74,8 @@ class ConfigParser: "n_sigma_parameter": 3.0, "win_type": get_sliding_window_type_enum("not_continuous"), "win_size": 30, - "win_threshold": 6, + "win_threshold_latency": 6, + "win_threshold_iodump": 3, }, "latency_sata_ssd": { "read_avg_lim": 10000, @@ -423,11 +427,11 @@ class ConfigParser: ) def _read_window_minimum_threshold(self, items_sliding_window: dict): - default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold"] - self._conf["algorithm"]["win_threshold"] = ( + default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold_latency"] + self._conf["algorithm"]["win_threshold_latency"] = ( self._get_config_value( items_sliding_window, - "win_threshold", + "win_threshold_latency", int, default_window_minimum_threshold, gt=0, @@ -495,6 +499,7 @@ class ConfigParser: self._read_sliding_window_type(items_algorithm) self._read_window_size(items_algorithm) self._read_window_minimum_threshold(items_algorithm) + self._read_window_threshold_iodump(items_algorithm) if con.has_section("latency_sata_ssd"): items_latency_sata_ssd = dict(con.items("latency_sata_ssd")) @@ -702,7 +707,8 @@ class ConfigParser: def get_window_size_and_window_minimum_threshold(self): return ( self._conf["algorithm"]["win_size"], - self._conf["algorithm"]["win_threshold"], + self._conf["algorithm"]["win_threshold_latency"], + self._conf["algorithm"]["win_threshold_iodump"], ) @property @@ -731,7 +737,7 @@ class ConfigParser: @property def window_minimum_threshold(self): - return self._conf["algorithm"]["win_threshold"] + return self._conf["algorithm"]["win_threshold_latency"] @property def absolute_threshold(self): @@ -767,4 +773,17 @@ class ConfigParser: @property def write_iodump_lim(self): - return self._conf["iodump"]["write_iodump_lim"] \ No newline at end of file + return self._conf["iodump"]["write_iodump_lim"] + + def _read_window_threshold_iodump(self, items_sliding_window: dict): + default_window_threshold_iodump = self.DEFAULT_CONF["algorithm"]["win_threshold_iodump"] + self._conf["algorithm"]["win_threshold_iodump"] = ( + self._get_config_value( + items_sliding_window, + "win_threshold_iodump", + int, + default_window_threshold_iodump, + gt=0, + le=self._conf["algorithm"]["win_size"], + ) + ) \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/data_access.py b/src/sentryPlugins/ai_block_io/data_access.py index 2f2d607499f9bc9b2fd8919eb0b3f8bca8e8c9f0..f1c2bc2592ad7946fcec0fe30b0180cfdcc9af78 100644 --- a/src/sentryPlugins/ai_block_io/data_access.py +++ b/src/sentryPlugins/ai_block_io/data_access.py @@ -15,12 +15,13 @@ import logging from sentryCollector.collect_plugin import ( Result_Messages, get_io_data, + get_iodump_data, is_iocollect_valid, get_disk_type ) -from .io_data import IOStageData, IOData +from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData COLLECT_STAGES = [ "throtl", @@ -33,6 +34,7 @@ COLLECT_STAGES = [ "rq_driver", "bio", "iocost", + "deadline", ] @@ -125,3 +127,46 @@ def get_io_data_from_collect_plug(period, disk_list): return ret logging.warning(f'get io data failed with message: {data_raw["message"]}') return None + + +def _get_raw_iodump_data(period, disk_list): + return get_iodump_data( + period, + disk_list, + COLLECT_STAGES, + ["read", "write", "flush", "discard"], + ) + + +def _get_iodump_stage_data(data): + io_stage_data = IOStageDumpData() + for data_type in ("read", "write", "flush", "discard"): + if data_type in data: + getattr(io_stage_data, data_type).iodump_data = data[data_type] + return io_stage_data + + +def get_iodump_data_from_collect_plug(period, disk_list): + data_raw = _get_raw_iodump_data(period, disk_list) + if data_raw["ret"] == 0: + ret = {} + try: + data = json.loads(data_raw["message"]) + except json.decoder.JSONDecodeError as e: + logging.warning(f"get iodump data failed, {e}") + return None + + for disk in data: + disk_data = data[disk] + disk_ret = IODumpData() + for k, v in disk_data.items(): + try: + getattr(disk_ret, k) + setattr(disk_ret, k, _get_iodump_stage_data(v)) + except AttributeError: + logging.debug(f"no attr {k}") + continue + ret[disk] = disk_ret + return ret + logging.warning(f'get iodump data failed with message: {data_raw["message"]}') + return None \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/detector.py b/src/sentryPlugins/ai_block_io/detector.py index 2688cb1867fb8e4e7ee56babde92c5894e674b82..6c0a03fbd9d2c08c7c059b61c12bb42dd77ac66d 100644 --- a/src/sentryPlugins/ai_block_io/detector.py +++ b/src/sentryPlugins/ai_block_io/detector.py @@ -13,8 +13,8 @@ from datetime import datetime from .io_data import MetricName from .threshold import Threshold -from .sliding_window import SlidingWindow -from .utils import get_metric_value_from_io_data_dict_by_metric_name +from .sliding_window import SlidingWindow, DataWindow +from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_iodump_data_dict class Detector: @@ -74,6 +74,35 @@ class Detector: f' sliding_window_type: {self._slidingWindow}') +class DataDetector: + + def __init__(self, metric_name: MetricName, data_window: DataWindow): + self._metric_name = metric_name + self._data_window = data_window + + def __repr__(self): + return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},' + f' io_type_name: {self._metric_name.io_access_type_name},' + f' metric_name: {self._metric_name.metric_name}') + + @property + def metric_name(self): + return self._metric_name + + def get_data_window_data(self): + return self._data_window.get_data() + + def push_data(self, iodump_data_dict_with_disk_name: dict): + logging.debug(f'enter Detector: {self}') + metric_value = get_metric_value_from_iodump_data_dict(iodump_data_dict_with_disk_name, self._metric_name) + if metric_value is None: + logging.debug('not found metric value, so return None.') + return False + logging.debug(f'input metric value: {str(metric_value)}') + self._data_window.push(metric_value) + return True + + def set_to_str(parameter: set): ret = "" parameter = list(parameter) @@ -91,19 +120,43 @@ class DiskDetector: def __init__(self, disk_name: str): self._disk_name = disk_name self._detector_list = [] + self._data_detector_list = [] + + def __repr__(self): + msg = f'disk: {self._disk_name}, ' + for detector in self._detector_list: + msg += f'\n detector: [{detector}]' + for data_detector in self._data_detector_list: + msg += f'\n data_detector: [{data_detector}]' + return msg def add_detector(self, detector: Detector): self._detector_list.append(detector) + def add_data_detector(self, data_detector: DataDetector): + self._data_detector_list.append(data_detector) + def get_detector_list_window(self): latency_wins = {"read": {}, "write": {}} iodump_wins = {"read": {}, "write": {}} + iops_wins = {"read": {}, "write": {}} for detector in self._detector_list: if detector.metric_name.metric_name == 'latency': latency_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() elif detector.metric_name.metric_name == 'io_dump': iodump_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() - return latency_wins, iodump_wins + elif detector.metric_name.metric_name == 'iops': + iops_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] =\ + detector.get_sliding_window_data() + return latency_wins, iodump_wins, iops_wins + + def get_data_detector_list_window(self): + iodump_data_wins = {"read": {}, "write": {}} + for data_detector in self._data_detector_list: + if data_detector.metric_name.metric_name == 'iodump_data': + iodump_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ + data_detector.get_data_window_data() + return iodump_data_wins def is_slow_io_event(self, io_data_dict_with_disk_name: dict): diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} @@ -134,8 +187,8 @@ class DiskDetector: io_type.add(metric_name.io_access_type_name) alarm_type.add(metric_name.metric_name) - latency_wins, iodump_wins = self.get_detector_list_window() - details = {"latency": latency_wins, "iodump": iodump_wins} + latency_wins, iodump_wins, iops_wins = self.get_detector_list_window() + details = {"latency": latency_wins, "iodump": iodump_wins, "iops": iops_wins} io_press = {"throtl", "wbt", "iocost", "bfq"} driver_slow = {"rq_driver"} @@ -150,8 +203,6 @@ class DiskDetector: return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details - def __repr__(self): - msg = f'disk: {self._disk_name}, ' - for detector in self._detector_list: - msg += f'\n detector: [{detector}]' - return msg + def push_data_to_data_detectors(self, iodump_data_dict_with_disk_name: dict): + for data_detector in self._data_detector_list: + data_detector.push_data(iodump_data_dict_with_disk_name) diff --git a/src/sentryPlugins/ai_block_io/extra_logger.py b/src/sentryPlugins/ai_block_io/extra_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..cfd19291fe1606d183f33927f68793622938d88e --- /dev/null +++ b/src/sentryPlugins/ai_block_io/extra_logger.py @@ -0,0 +1,186 @@ +# coding: utf-8 +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +import os +import re + +extra_logger = None + + +# Define stage groups +STAGE_GROUPS = { + 'B->Q': ['throtl', 'wbt', 'iocost'], + 'Q->G': ['gettag'], + 'G->I': ['plug'], + 'I->D': ['deadline', 'bfq', 'hctx', 'requeue'], + 'D->C': ['rq_driver'] +} + + +def init_extra_logger(log_path, log_level, log_format): + global extra_logger + try: + if not os.path.exists(log_path): + fd = os.open(log_path, os.O_CREAT | os.O_WRONLY, 0o600) + os.close(fd) + logger_name = f"extra_logger_{log_path}" + logger = logging.getLogger(logger_name) + logger.propagate = False + logger.setLevel(log_level) + + file_handler = logging.FileHandler(log_path) + file_handler.setLevel(log_level) + + formatter = logging.Formatter(log_format) + file_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + extra_logger = logger + except Exception as e: + logging.error(f"Failed to create extra logger for {log_path}: {e}") + extra_logger = logging.getLogger() # Fallback to default logger + + +def extra_slow_log(msg): + if "latency" in str(msg.get('alarm_type', '')): + extra_latency_log(msg) + if "io_dump" in str(msg.get('alarm_type', '')): + extra_iodump_log(msg) + + +def extra_latency_log(msg): + io_types = [iot.strip() for iot in re.split(r',+', msg['io_type'])] + + # Calculate iops average + for io_type in io_types: + iops_avg = 0 + iops_data_dict = msg['details']['iops'].get(io_type, {}) + if 'rq_driver' in iops_data_dict: + iops_avg = sum(iops_data_dict['rq_driver']) / len(iops_data_dict['rq_driver']) + + extra_logger.warning(f"[SLOW IO] latency, disk:{msg['driver_name']}, iotype:{io_type}, iops:{int(iops_avg)}") + + # Calculate statistics for each group + latency_data_dict = msg['details']['latency'].get(io_type, {}) + group_stats = {} + for group_name, stages in STAGE_GROUPS.items(): + all_values = [] + for stage in stages: + if stage in latency_data_dict: + all_values.extend(latency_data_dict[stage]) + if all_values: + min_val = min(all_values) + max_val = max(all_values) + avg_val = sum(all_values) / len(all_values) + else: + min_val = 0 + max_val = 0 + avg_val = 0 + # Convert to ms + min_val_ms = min_val / 1000.0 + max_val_ms = max_val / 1000.0 + avg_val_ms = avg_val / 1000.0 + group_stats[group_name] = { + 'min': min_val_ms, + 'max': max_val_ms, + 'avg': avg_val_ms + } + + # Calculate total latency (B->C) + total_avg = 0 + total_min = 0 + total_max = 0 + for group_name in STAGE_GROUPS: + total_avg += group_stats[group_name]['avg'] + total_min += group_stats[group_name]['min'] + total_max += group_stats[group_name]['max'] + group_stats['B->C'] = { + 'min': total_min, + 'max': total_max, + 'avg': total_avg + } + + # Calculate PCT for each group (except B->C) + for group_name in STAGE_GROUPS: + if total_avg > 0: + pct = (group_stats[group_name]['avg'] / total_avg) * 100 + else: + pct = 0 + group_stats[group_name]['pct'] = pct + group_stats['B->C']['pct'] = 100.0 + + # Output table + stage_order = ['B->Q', 'Q->G', 'G->I', 'I->D', 'D->C', 'B->C'] + stage_width = 7 + num_width = 12 + pct_width = 8 + + extra_logger.warning( + f"{'Stage':<{stage_width}} " + f"{'Min(ms)':>{num_width}} " + f"{'Max(ms)':>{num_width}} " + f"{'Avg(ms)':>{num_width}} " + f"{'PCT':>{pct_width}}" + ) + + for stage in stage_order: + try: + s = group_stats[stage] + min_str = f"{s['min']:>.3f}" + max_str = f"{s['max']:>.3f}" + avg_str = f"{s['avg']:>.3f}" + pct_str = f"{s['pct']:.2f}%" + + extra_logger.warning( + f"{stage:<{stage_width}} " + f"{min_str:>{num_width}} " + f"{max_str:>{num_width}} " + f"{avg_str:>{num_width}} " + f"{pct_str:>{pct_width}}" + ) + except KeyError: + return + + +def extra_iodump_log(msg): + io_types = [iot.strip() for iot in re.split(r',+', msg['io_type'])] + + for io_type in io_types: + extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{io_type}") + iodump_data = msg['details']['iodump_data'].get(io_type, {}) + + try: + bio_data = iodump_data['bio'] + except Exception as e: + extra_logger.error(f"Failed to parse iodump data: {e}") + return + + stack_to_stage = {} + for stage, stacks in STAGE_GROUPS.items(): + for stack in stacks: + stack_to_stage[stack] = stage + + last_bio_record = {} + for window in bio_data: + for entry in window: + parts = entry.split(',') + task_name, pid, io_stack, bio_ptr, start_ago = parts + if io_stack in stack_to_stage: + stage = stack_to_stage[io_stack] + last_bio_record[bio_ptr] = (task_name, pid, io_stack, stage, bio_ptr, start_ago) + + header = f"{'TASK_NAME':<18} {'PID':>8} {'IO_STACK':<12} {'STAGE':<8} {'BIO_PTR':<20} {'START_AGO(ms)':>10}" + extra_logger.warning(header) + + for bio_ptr in last_bio_record: + task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] + line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" + extra_logger.warning(line) \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/io_data.py b/src/sentryPlugins/ai_block_io/io_data.py index 604291157ae0a9d160e7adb10c9360cbcbf68a1e..023e7b1b54f9738d83b0239015b4a8db86e40ae4 100644 --- a/src/sentryPlugins/ai_block_io/io_data.py +++ b/src/sentryPlugins/ai_block_io/io_data.py @@ -11,7 +11,7 @@ from dataclasses import dataclass, field from datetime import datetime -from typing import Optional +from typing import Optional, List @dataclass @@ -42,6 +42,36 @@ class IOData: requeue: IOStageData = field(default_factory=lambda: IOStageData()) rq_driver: IOStageData = field(default_factory=lambda: IOStageData()) bio: IOStageData = field(default_factory=lambda: IOStageData()) + deadline: IOStageData = field(default_factory=lambda: IOStageData()) + time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + +@dataclass +class IoDumpListData: + iodump_data: List[str] = field(default_factory=list) + + +@dataclass +class IOStageDumpData: + read: IoDumpListData = field(default_factory=lambda: IoDumpListData()) + write: IoDumpListData = field(default_factory=lambda: IoDumpListData()) + flush: IoDumpListData = field(default_factory=lambda: IoDumpListData()) + discard: IoDumpListData = field(default_factory=lambda: IoDumpListData()) + + +@dataclass +class IODumpData: + throtl: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + wbt: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + gettag: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + iocost: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + plug: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + bfq: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + hctx: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + requeue: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + rq_driver: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + bio: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + deadline: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) diff --git a/src/sentryPlugins/ai_block_io/sliding_window.py b/src/sentryPlugins/ai_block_io/sliding_window.py index b174d94c29100428d6a39f6a26e5a9e62542f6f3..6881baa0c67c196796e2d3bef23b799b2987aa3f 100644 --- a/src/sentryPlugins/ai_block_io/sliding_window.py +++ b/src/sentryPlugins/ai_block_io/sliding_window.py @@ -10,6 +10,7 @@ # See the Mulan PSL v2 for more details. from enum import Enum, unique +from typing import Any import numpy as np @@ -33,10 +34,8 @@ class SlidingWindow: def is_abnormal(self, data): if self._avg_lim is not None and data < self._avg_lim: return False - if self._avg_lim is not None and self._ai_threshold is not None: - threshold = max(self._avg_lim, self._ai_threshold) - if data > threshold: - return True + if self._ai_threshold is not None and data > self._ai_threshold: + return True if self._abs_threshold is not None and data > self._abs_threshold: return True return False @@ -130,3 +129,20 @@ class SlidingWindowFactory: return MedianSlidingWindow(*args, **kwargs) else: return NotContinuousSlidingWindow(*args, **kwargs) + + +class DataWindow: + def __init__(self, window_size: int): + self._window_size = window_size + self._data_queue = [] + + def __repr__(self): + return f"[SingleDataWindow, window size: {self._window_size}]" + + def push(self, data: Any): + if len(self._data_queue) == self._window_size: + self._data_queue.pop(0) + self._data_queue.append(data) + + def get_data(self): + return self._data_queue \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/utils.py b/src/sentryPlugins/ai_block_io/utils.py index 7d2390b9a0a788af367b52c1ed6015a4239cd55a..919cf9bc8d0d74d1c32427cd03fdcb09b5d21786 100644 --- a/src/sentryPlugins/ai_block_io/utils.py +++ b/src/sentryPlugins/ai_block_io/utils.py @@ -15,7 +15,7 @@ from dataclasses import asdict from .threshold import ThresholdType from .sliding_window import SlidingWindowType -from .io_data import MetricName, IOData +from .io_data import MetricName, IOData, IODumpData def get_threshold_type_enum(algorithm_type: str): @@ -49,6 +49,17 @@ def get_metric_value_from_io_data_dict_by_metric_name( return None +def get_metric_value_from_iodump_data_dict(io_dump_data_dict: dict, metric_name: MetricName): + try: + io_dump_data: IODumpData = io_dump_data_dict[metric_name.disk_name] + io_dump_stage_data = asdict(io_dump_data)[metric_name.stage_name] + base_data = io_dump_stage_data[metric_name.io_access_type_name] + metric_value = base_data[metric_name.metric_name] + return metric_value + except KeyError: + return None + + def get_data_queue_size_and_update_size( training_data_duration: float, train_update_duration: float, diff --git a/src/sentryPlugins/avg_block_io/avg_block_io.py b/src/sentryPlugins/avg_block_io/avg_block_io.py index 899d517735d488c3f54029d1f675e3a23d32f235..ef19b7b2faba316f5c1497b615c972a3b6342764 100644 --- a/src/sentryPlugins/avg_block_io/avg_block_io.py +++ b/src/sentryPlugins/avg_block_io/avg_block_io.py @@ -14,11 +14,13 @@ import configparser import time from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage -from .stage_window import IoWindow, IoDumpWindow -from .module_conn import avg_is_iocollect_valid, avg_get_io_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation -from .utils import update_avg_and_check_abnormal +from .stage_window import IoWindow, IoDumpWindow,IopsWindow,IodumpMsgWindow +from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation +from .utils import update_avg_and_check_abnormal, update_avg_iodump_data +from .extra_logger import init_extra_logger CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" +AVG_EXTRA_LOG_PATH = "/var/log/sysSentry/avg_block_io_extra.log" def init_io_win(io_dic, config, common_param): @@ -52,12 +54,23 @@ def init_io_win(io_dic, config, common_param): iodump_lim_value = curr_stage_param.get(iodump_lim_key, common_param.get("iodump", {}).get(iodump_lim_key)) if avg_lim_value and avg_time_value and tot_lim_value: - io_data[disk_name][stage_name][rw]["latency"] = IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, abnormal_time=tot_lim_value) + io_data[disk_name][stage_name][rw]["latency"] = \ + IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold_latency"], \ + abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, \ + abnormal_time=tot_lim_value) logging.debug("Successfully create {}-{}-{}-latency window".format(disk_name, stage_name, rw)) if iodump_lim_value is not None: - io_data[disk_name][stage_name][rw]["iodump"] = IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_time=iodump_lim_value) + io_data[disk_name][stage_name][rw]["iodump"] =\ + IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold_iodump"],\ + abnormal_time=iodump_lim_value) logging.debug("Successfully create {}-{}-{}-iodump window".format(disk_name, stage_name, rw)) + + io_data[disk_name][stage_name][rw]["iops"] = IopsWindow(window_size=io_dic["win_size"]) + logging.debug("Successfully create {}-{}-{}-iops window".format(disk_name, stage_name, rw)) + + io_data[disk_name][stage_name][rw]["iodump_data"] = IodumpMsgWindow(window_size=io_dic["win_size"]) + logging.debug("Successfully create {}-{}-{}-iodump_data window".format(disk_name, stage_name, rw)) return io_data, io_avg_value @@ -124,6 +137,9 @@ def main_loop(io_dic, io_data, io_avg_value): logging.error(f"{curr_period_data['msg']}") continue + # 获取iodump的详细信息 + is_success, iodump_data = avg_get_iodump_data(io_dic) + # 处理周期数据 reach_size = False for disk_name in disk_list: @@ -132,6 +148,7 @@ def main_loop(io_dic, io_data, io_avg_value): if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: io_key = (disk_name, stage_name, rw) reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) + update_avg_iodump_data(iodump_data, is_success, io_key, io_data) # win_size不满时不进行告警判断 if not reach_size: @@ -152,6 +169,7 @@ def main(): log_level = read_config_log(CONFIG_FILE) log_format = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" logging.basicConfig(level=log_level, format=log_format) + init_extra_logger(AVG_EXTRA_LOG_PATH, log_level, log_format) # 初始化配置读取 config = configparser.ConfigParser(comment_prefixes=('#', ';')) @@ -175,7 +193,7 @@ def main(): # 初始化窗口 -- config读取,对应is_iocollect_valid返回的结果 # step1. 解析公共配置 --- algorithm - io_dic["win_size"], io_dic["win_threshold"] = read_config_algorithm(config) + io_dic["win_size"], io_dic["win_threshold_latency"], io_dic["win_threshold_iodump"] = read_config_algorithm(config) # step2. 解析公共配置 --- latency_xxx common_param = read_config_latency(config) diff --git a/src/sentryPlugins/avg_block_io/config.py b/src/sentryPlugins/avg_block_io/config.py index c1e8ab10425ce8f19f246aeac277a51945aede80..79bd21a7a6cf70639398b94e96d3ddf4f59cd644 100644 --- a/src/sentryPlugins/avg_block_io/config.py +++ b/src/sentryPlugins/avg_block_io/config.py @@ -24,7 +24,8 @@ CONF_COMMON_PER_TIME = 'period_time' CONF_ALGO = 'algorithm' CONF_ALGO_SIZE = 'win_size' -CONF_ALGO_THRE = 'win_threshold' +CONF_ALGO_THRE_LATENCY = 'win_threshold_latency' +CONF_ALGO_THRE_IODUMP = 'win_threshold_iodump' CONF_LATENCY = 'latency_{}' CONF_IODUMP = 'iodump' @@ -40,7 +41,8 @@ DEFAULT_PARAM = { CONF_COMMON_PER_TIME: 1 }, CONF_ALGO: { CONF_ALGO_SIZE: 30, - CONF_ALGO_THRE: 6 + CONF_ALGO_THRE_LATENCY: 6, + CONF_ALGO_THRE_IODUMP: 3 }, 'latency_nvme_ssd': { 'read_avg_lim': 10000, 'write_avg_lim': 10000, @@ -162,16 +164,26 @@ def read_config_algorithm(config): logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_SIZE}, use {win_size} as default") try: - win_threshold = int(config.get(CONF_ALGO, CONF_ALGO_THRE)) - if win_threshold < 1 or win_threshold > 300 or win_threshold > win_size: - raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE}") + win_threshold_latency = int(config.get(CONF_ALGO, CONF_ALGO_THRE_LATENCY)) + if win_threshold_latency < 1 or win_threshold_latency > 300 or win_threshold_latency > win_size: + raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY}") except ValueError: - report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE} config") + report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY} config") except configparser.NoOptionError: - win_threshold = DEFAULT_PARAM[CONF_ALGO]['win_threshold'] - logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE}, use {win_threshold} as default") + win_threshold_latency = DEFAULT_PARAM[CONF_ALGO]['win_threshold_latency'] + logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY}, use {win_threshold_latency} as default") - return win_size, win_threshold + try: + win_threshold_iodump = int(config.get(CONF_ALGO, CONF_ALGO_THRE_IODUMP)) + if win_threshold_iodump < 1 or win_threshold_iodump > 300 or win_threshold_iodump > win_size: + raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP}") + except ValueError: + report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP} config") + except configparser.NoOptionError: + win_threshold_iodump = DEFAULT_PARAM[CONF_ALGO][CONF_ALGO_THRE_IODUMP] + logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP}, use {win_threshold_iodump} as default") + + return win_size, win_threshold_latency, win_threshold_iodump def read_config_latency(config): diff --git a/src/sentryPlugins/avg_block_io/extra_logger.py b/src/sentryPlugins/avg_block_io/extra_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..ac86306a3a0009c23b7915c72cfa9244615f9406 --- /dev/null +++ b/src/sentryPlugins/avg_block_io/extra_logger.py @@ -0,0 +1,199 @@ +# coding: utf-8 +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +import os +import re +import ast + +extra_logger = None + + +# Define stage groups +STAGE_GROUPS = { + 'B->Q': ['throtl', 'wbt', 'iocost'], + 'Q->G': ['gettag'], + 'G->I': ['plug'], + 'I->D': ['deadline', 'bfq', 'hctx', 'requeue'], + 'D->C': ['rq_driver'] +} + +PATTERN = re.compile(r'(\w+):\s*\[([0-9.,]+)\]') + + +def init_extra_logger(log_path, log_level, log_format): + global extra_logger + try: + if not os.path.exists(log_path): + fd = os.open(log_path, os.O_CREAT | os.O_WRONLY, 0o600) + os.close(fd) + logger_name = f"extra_logger_{log_path}" + logger = logging.getLogger(logger_name) + logger.propagate = False + logger.setLevel(log_level) + + file_handler = logging.FileHandler(log_path) + file_handler.setLevel(log_level) + + formatter = logging.Formatter(log_format) + file_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + extra_logger = logger + except Exception as e: + logging.error(f"Failed to create extra logger for {log_path}: {e}") + extra_logger = logging.getLogger() # Fallback to default logger + + +def extra_slow_log(msg): + if "latency" in str(msg.get('alarm_type', '')): + extra_latency_log(msg) + if "iodump" in str(msg.get('alarm_type', '')): + extra_iodump_log(msg) + + +def extra_latency_log(msg): + # Parse the iops string from msg + iops_avg = 0 + iops_str = msg['details']['iops'] + iops_matches = re.findall(PATTERN, iops_str) + iops_data = {} + for match in iops_matches: + key = match[0] + values = list(map(float, match[1].split(','))) + iops_data[key] = values + if 'rq_driver' in iops_data and iops_data['rq_driver']: + iops_avg = sum(iops_data['rq_driver']) / len(iops_data['rq_driver']) + + extra_logger.warning(f"[SLOW IO] alarm_type: latency, disk: {msg['driver_name']}, " + f"iotype: {msg['io_type']}, iops: {int(iops_avg)}") + + # Parse the latency string from msg + latency_str = msg['details']['latency'] + latency_matches = re.findall(PATTERN, latency_str) + latency_data = {} + for match in latency_matches: + key = match[0] + values = list(map(float, match[1].split(','))) + latency_data[key] = values + + # Calculate statistics for each group + group_stats = {} + for group_name, stages in STAGE_GROUPS.items(): + all_values = [] + for stage in stages: + if stage in latency_data: + all_values.extend(latency_data[stage]) + if all_values: + min_val = min(all_values) + max_val = max(all_values) + avg_val = sum(all_values) / len(all_values) + else: + min_val = 0 + max_val = 0 + avg_val = 0 + # Convert to ms + min_val_ms = min_val / 1000.0 + max_val_ms = max_val / 1000.0 + avg_val_ms = avg_val / 1000.0 + group_stats[group_name] = { + 'min': min_val_ms, + 'max': max_val_ms, + 'avg': avg_val_ms + } + + # Calculate total latency (B->C) + total_avg = 0 + total_min = 0 + total_max = 0 + for group_name in STAGE_GROUPS: + total_avg += group_stats[group_name]['avg'] + total_min += group_stats[group_name]['min'] + total_max += group_stats[group_name]['max'] + group_stats['B->C'] = { + 'min': total_min, + 'max': total_max, + 'avg': total_avg + } + + # Calculate PCT for each group (except B->C) + for group_name in STAGE_GROUPS: + if total_avg > 0: + pct = (group_stats[group_name]['avg'] / total_avg) * 100 + else: + pct = 0 + group_stats[group_name]['pct'] = pct + group_stats['B->C']['pct'] = 100.0 + + # Output table + stage_order = ['B->Q', 'Q->G', 'G->I', 'I->D', 'D->C', 'B->C'] + stage_width = 7 + num_width = 12 + pct_width = 8 + + extra_logger.warning( + f"{'Stage':<{stage_width}} " + f"{'Min(ms)':>{num_width}} " + f"{'Max(ms)':>{num_width}} " + f"{'Avg(ms)':>{num_width}} " + f"{'PCT':>{pct_width}}" + ) + + for stage in stage_order: + try: + s = group_stats[stage] + min_str = f"{s['min']:>.3f}" + max_str = f"{s['max']:>.3f}" + avg_str = f"{s['avg']:>.3f}" + pct_str = f"{s['pct']:.2f}%" + + extra_logger.warning( + f"{stage:<{stage_width}} " + f"{min_str:>{num_width}} " + f"{max_str:>{num_width}} " + f"{avg_str:>{num_width}} " + f"{pct_str:>{pct_width}}" + ) + except KeyError: + return + + +def extra_iodump_log(msg): + extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{msg['io_type']}") + iodump_str = msg['details']['iodump_data'] + + try: + iodump_data = ast.literal_eval(iodump_str) + bio_data = iodump_data['bio'] + except Exception as e: + extra_logger.error(f"Failed to parse iodump data: {e}") + return + + stack_to_stage = {} + for stage, stacks in STAGE_GROUPS.items(): + for stack in stacks: + stack_to_stage[stack] = stage + + last_bio_record = {} + for window in bio_data: + for entry in window: + parts = entry.split(',') + task_name, pid, io_stack, bio_ptr, start_ago = parts + if io_stack in stack_to_stage: + stage = stack_to_stage[io_stack] + last_bio_record[bio_ptr] = (task_name, pid, io_stack, stage, bio_ptr, start_ago) + + header = f"{'TASK_NAME':<18} {'PID':>8} {'IO_STACK':<12} {'STAGE':<8} {'BIO_PTR':<20} {'START_AGO(ms)':>10}" + extra_logger.warning(header) + + for bio_ptr in last_bio_record: + task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] + line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" + extra_logger.warning(line) \ No newline at end of file diff --git a/src/sentryPlugins/avg_block_io/module_conn.py b/src/sentryPlugins/avg_block_io/module_conn.py index bc108027b001c0e6c5b4460f2f6254de7c3920df..7bb0c9367fb9ce9bda49f24894705ecc201bd696 100644 --- a/src/sentryPlugins/avg_block_io/module_conn.py +++ b/src/sentryPlugins/avg_block_io/module_conn.py @@ -12,10 +12,10 @@ import json import logging import sys -from .utils import is_abnormal, get_win_data, log_slow_win -from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, Result_Messages, get_disk_type, Disk_Type +from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, Result_Messages, get_disk_type, Disk_Type from syssentry.result import ResultLevel, report_result from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR +from .utils import is_abnormal, get_win_data, log_slow_win TASK_NAME = "avg_block_io" @@ -34,6 +34,14 @@ def avg_get_io_data(io_dic): return check_result_validation(res, 'get io data') +def avg_get_iodump_data(io_dic): + """avg_get_iodump_data from sentryCollector""" + logging.debug(f"send to sentryCollector avg_get_iodump_data: period={io_dic['period_time']}, " + f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") + res = get_iodump_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) + return check_result_validation(res, 'get io dump data') + + def avg_is_iocollect_valid(io_dic, config_disk, config_stage): """is_iocollect_valid from sentryCollector""" logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " @@ -89,6 +97,7 @@ def process_report_data(disk_name, rw, io_data): msg["block_stack"] = f"bio,{stage_name}" msg["alarm_type"] = abnormal_list log_slow_win(msg, "IO press") + del msg["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return @@ -99,6 +108,7 @@ def process_report_data(disk_name, rw, io_data): msg["block_stack"] = "bio,rq_driver" msg["alarm_type"] = abnormal_list log_slow_win(msg, "driver slow") + del msg["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return @@ -112,10 +122,12 @@ def process_report_data(disk_name, rw, io_data): msg["block_stack"] = f"bio,{stage_name}" msg["alarm_type"] = abnormal_list log_slow_win(msg, "kernel slow") + del msg["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return log_slow_win(msg, "unknown") + del msg["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) diff --git a/src/sentryPlugins/avg_block_io/stage_window.py b/src/sentryPlugins/avg_block_io/stage_window.py index 587bd497ae71b189cb60c5f2915562edff5f7848..29fa6e152efe94b342bfa3de0e90a2c2a4cac217 100644 --- a/src/sentryPlugins/avg_block_io/stage_window.py +++ b/src/sentryPlugins/avg_block_io/stage_window.py @@ -53,3 +53,21 @@ class IoDumpWindow(AbnormalWindowBase): def is_abnormal_period(self, value, avg_val=0): return value > self.abnormal_time + + +class IopsWindow(AbnormalWindowBase): + def is_abnormal_period(self, value, avg_val=10): + return False + + +class IodumpMsgWindow: + def __init__(self, window_size=10): + self.window_size = window_size + self.window_data = [[] for _ in range(window_size)] + + def append_new_data(self, msg): + self.window_data.pop(0) + self.window_data.append(msg) + + def window_data_to_string(self): + return str(self.window_data) \ No newline at end of file diff --git a/src/sentryPlugins/avg_block_io/utils.py b/src/sentryPlugins/avg_block_io/utils.py index d5f8bb4e5056da01cad3fa6fd7787b5519146ee7..d9af7fee6a2ce783b609ccb9d2fbbc6ca21e161c 100644 --- a/src/sentryPlugins/avg_block_io/utils.py +++ b/src/sentryPlugins/avg_block_io/utils.py @@ -9,6 +9,7 @@ # PURPOSE. # See the Mulan PSL v2 for more details. import logging +from .extra_logger import extra_slow_log AVG_VALUE = 0 AVG_COUNT = 1 @@ -39,6 +40,8 @@ def get_win_data(disk_name, rw, io_data): """get latency and iodump win data""" latency = '' iodump = '' + iops = '' + iodump_data = '' for stage_name in io_data[disk_name]: if 'latency' in io_data[disk_name][stage_name][rw]: latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() @@ -46,7 +49,15 @@ def get_win_data(disk_name, rw, io_data): if 'iodump' in io_data[disk_name][stage_name][rw]: iodump_list = io_data[disk_name][stage_name][rw]['iodump'].window_data_to_string() iodump += f'{stage_name}: [{iodump_list}], ' - return {"latency": latency[:-2], "iodump": iodump[:-2]} + if 'iops' in io_data[disk_name][stage_name][rw]: + iops_list = io_data[disk_name][stage_name][rw]['iops'].window_data_to_string() + iops += f'{stage_name}: [{iops_list}], ' + if 'iodump_data' in io_data[disk_name][stage_name][rw]: + iodump_data_list = io_data[disk_name][stage_name][rw]['iodump_data'].window_data_to_string() + iodump_data += f'"{stage_name}": {iodump_data_list}, ' + if iodump_data: + iodump_data = '{' + iodump_data[:-2] + '}' + return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], "iodump_data": iodump_data} def is_abnormal(io_key, io_data): @@ -90,6 +101,8 @@ def update_io_data(period_value, io_data, io_key): io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_data(period_value[0]) if all_wins and "iodump" in all_wins: io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_data(period_value[1]) + if all_wins and "iops" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["iops"].append_new_data(period_value[3]) def log_abnormal_period(old_avg, period_value, io_data, io_key): @@ -111,6 +124,8 @@ def log_slow_win(msg, reason): f"iotype: {msg['io_type']}, type: {msg['alarm_type']}, reason: {reason}") logging.info(f"latency: {msg['details']['latency']}") logging.info(f"iodump: {msg['details']['iodump']}") + logging.info(f"iops: {msg['details']['iops']}") + extra_slow_log(msg) def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data): @@ -137,3 +152,15 @@ def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data) return True set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) return True + + +def update_avg_iodump_data(iodump_data, is_success, io_key, io_data): + """update iodump data to io_data""" + all_wins = get_nested_value(io_data, io_key) + if all_wins and "iodump_data" in all_wins: + if not is_success: + io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data([]) + else: + period_value = get_nested_value(iodump_data, io_key) + io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data(period_value) + diff --git a/src/services/sentryCollector/collect_config.py b/src/services/sentryCollector/collect_config.py index 7ca9898bf27197f331a0a302742b21fc6a31a9b7..5793fa3beb45e3c6fbf492b20edffd71cea05e32 100644 --- a/src/services/sentryCollector/collect_config.py +++ b/src/services/sentryCollector/collect_config.py @@ -31,6 +31,10 @@ CONF_IO_DISK = 'disk' CONF_IO_PERIOD_TIME_DEFAULT = 1 CONF_IO_MAX_SAVE_DEFAULT = 10 CONF_IO_DISK_DEFAULT = "default" +CONF_IO_NVME_SSD = "nvme_ssd_threshold" +CONF_IO_SATA_SSD = "sata_ssd_threshold" +CONF_IO_SATA_HDD = "sata_hdd_threshold" +CONF_IO_THRESHOLD_DEFAULT = 1000 # log CONF_LOG = 'log' @@ -144,5 +148,35 @@ class CollectConfig: logging.debug("config get_io_config: %s", result_io_config) return result_io_config + def get_io_threshold(self): + result_io_threshold = {} + io_map_value = self.load_module_config(CONF_IO) + # nvme ssd threshold + nvme_ssd_threshold = io_map_value.get(CONF_IO_NVME_SSD) + if nvme_ssd_threshold and nvme_ssd_threshold.isdigit() and int(nvme_ssd_threshold) >= 1: + result_io_threshold[CONF_IO_NVME_SSD] = int(nvme_ssd_threshold) + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %d", + CONF_IO, CONF_IO_NVME_SSD, CONF_IO_THRESHOLD_DEFAULT) + result_io_threshold[CONF_IO_NVME_SSD] = CONF_IO_THRESHOLD_DEFAULT + # sata ssd threshold + sata_ssd_threshold = io_map_value.get(CONF_IO_SATA_SSD) + if sata_ssd_threshold and sata_ssd_threshold.isdigit() and int(sata_ssd_threshold) >= 1: + result_io_threshold[CONF_IO_SATA_SSD] = int(sata_ssd_threshold) + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %d", + CONF_IO, CONF_IO_SATA_SSD, CONF_IO_THRESHOLD_DEFAULT) + result_io_threshold[CONF_IO_SATA_SSD] = CONF_IO_THRESHOLD_DEFAULT + # sata hdd threshold + sata_hdd_threshold = io_map_value.get(CONF_IO_SATA_HDD) + if sata_hdd_threshold and sata_hdd_threshold.isdigit() and int(sata_hdd_threshold) >= 1: + result_io_threshold[CONF_IO_SATA_HDD] = int(sata_hdd_threshold) + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %d", + CONF_IO, CONF_IO_SATA_HDD, CONF_IO_THRESHOLD_DEFAULT) + result_io_threshold[CONF_IO_SATA_HDD] = CONF_IO_THRESHOLD_DEFAULT + logging.debug("config get_io_threshold: %s", result_io_threshold) + return result_io_threshold + def get_common_config(self): return {key.lower(): value for key, value in self.config['common'].items()} diff --git a/src/services/sentryCollector/collect_io.py b/src/services/sentryCollector/collect_io.py index 6db28ec9042499f9ecf800737328372f4b4b4da5..612ee690c8dd88b97ca363a18aeff7b38142e8f0 100644 --- a/src/services/sentryCollector/collect_io.py +++ b/src/services/sentryCollector/collect_io.py @@ -17,18 +17,25 @@ import time import logging import threading import subprocess +import re from typing import Union from .collect_config import CollectConfig +from .collect_config import CONF_IO_NVME_SSD, CONF_IO_SATA_SSD, CONF_IO_SATA_HDD, CONF_IO_THRESHOLD_DEFAULT +from .collect_plugin import get_disk_type, DiskType Io_Category = ["read", "write", "flush", "discard"] IO_GLOBAL_DATA = {} IO_CONFIG_DATA = [] +IO_DUMP_DATA = {} EBPF_GLOBAL_DATA = [] EBPF_PROCESS = None EBPF_STAGE_LIST = ["wbt", "rq_driver", "bio", "gettag"] EBPF_SUPPORT_VERSION = ["6.6.0"] +#iodump data limit +IO_DUMP_DATA_LIMIT = 10 + class IoStatus(): TOTAL = 0 FINISH = 1 @@ -50,6 +57,7 @@ class CollectIo(): self.ebpf_base_path = 'ebpf_collector' self.loop_all = False + self.io_threshold_config = module_config.get_io_threshold() if disk_str == "default": self.loop_all = True @@ -57,10 +65,51 @@ class CollectIo(): self.disk_list = disk_str.strip().split(',') self.stop_event = threading.Event() + self.iodump_pattern = re.compile( + r'(?P[^-]+)-(?P\d+)\s+' + r'\w+\s+' + r'stage\s+(?P\w+)\s+' + r'(?P[0-9a-fA-F]{16})\s+' + r'.*started\s+(?P\d+)\s+ns\s+ago' + ) IO_CONFIG_DATA.append(self.period_time) IO_CONFIG_DATA.append(self.max_save) + def update_io_threshold(self, disk_name, stage_list): + disk_type_result = get_disk_type(disk_name) + if disk_type_result["ret"] == 0 and disk_type_result["message"] in ('0', '1', '2'): + disk_type = int(disk_type_result["message"]) + if disk_type == DiskType.TYPE_NVME_SSD: + config_threshold = str(self.io_threshold_config[CONF_IO_NVME_SSD]) + elif disk_type == DiskType.TYPE_SATA_SSD: + config_threshold = str(self.io_threshold_config[CONF_IO_SATA_SSD]) + elif disk_type == DiskType.TYPE_SATA_HDD: + config_threshold = str(self.io_threshold_config[CONF_IO_SATA_HDD]) + else: + return + + for stage in stage_list: + io_threshold_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/threshold'.format(disk_name, stage) + try: + with open(io_threshold_file, 'r') as file: + current_threshold = file.read().strip() + except FileNotFoundError: + logging.error("The file %s does not exist.", io_threshold_file) + continue + except Exception as e: + logging.error("An error occurred while reading: %s", e) + continue + + if current_threshold != config_threshold: + try: + with open(io_threshold_file, 'w') as file: + file.write(config_threshold) + logging.info("update %s io_dump_threshold from %s to %s", + io_threshold_file, current_threshold, config_threshold) + except Exception as e: + logging.error("An error occurred while writing: %s", e) + def get_blk_io_hierarchy(self, disk_name, stage_list): stats_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/stats'.format(disk_name) try: @@ -95,6 +144,8 @@ class CollectIo(): # read=0, write=1, flush=2, discard=3 if (len(IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save: IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].pop() + if (len(IO_DUMP_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save: + IO_DUMP_DATA[disk_name][stage][Io_Category[index]].pop() curr_lat = self.get_latency_value(curr_stage_value, last_stage_value, index) curr_iops = self.get_iops(curr_stage_value, last_stage_value, index) @@ -102,6 +153,8 @@ class CollectIo(): curr_io_dump = self.get_io_dump(disk_name, stage, index) IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) + if curr_io_dump == 0: + IO_DUMP_DATA[disk_name][stage][Io_Category[index]].insert(0, []) def get_iops(self, curr_stage_value, last_stage_value, category): try: @@ -151,11 +204,31 @@ class CollectIo(): def get_io_dump(self, disk_name, stage, category): io_dump_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/io_dump'.format(disk_name, stage) count = 0 + io_dump_msg = [] + pattern = self.iodump_pattern + try: with open(io_dump_file, 'r') as file: for line in file: - count += line.count('.op=' + Io_Category[category].upper()) + if line.count('.op=' + Io_Category[category].upper()) > 0: + match = pattern.match(line) + if match: + if count < IO_DUMP_DATA_LIMIT: + parsed = match.groupdict() + values = [ + parsed["task_name"], + parsed["pid"], + parsed["stage"], + parsed["ptr"], + str(int(parsed["start_time_ns"]) // 1000000) + ] + value_str = ",".join(values) + io_dump_msg.append(value_str) + else: + logging.info(f"io_dump parse err, info : {line.strip()}") + count += 1 if count > 0: + IO_DUMP_DATA[disk_name][stage][Io_Category[category]].insert(0, io_dump_msg) logging.info(f"io_dump info : {disk_name}, {stage}, {Io_Category[category]}, {count}") except FileNotFoundError: logging.error("The file %s does not exist.", io_dump_file) @@ -211,6 +284,7 @@ class CollectIo(): self.disk_map_stage[disk_name] = stage_list self.window_value[disk_name] = {} IO_GLOBAL_DATA[disk_name] = {} + IO_DUMP_DATA[disk_name] = {} return len(IO_GLOBAL_DATA) != 0 @@ -226,13 +300,16 @@ class CollectIo(): self.disk_map_stage[disk_name] = EBPF_STAGE_LIST self.window_value[disk_name] = {} IO_GLOBAL_DATA[disk_name] = {} + IO_DUMP_DATA[disk_name] = {} for disk_name, stage_list in self.disk_map_stage.items(): for stage in stage_list: self.window_value[disk_name][stage] = {} IO_GLOBAL_DATA[disk_name][stage] = {} + IO_DUMP_DATA[disk_name][stage] = {} for category in Io_Category: IO_GLOBAL_DATA[disk_name][stage][category] = [] + IO_DUMP_DATA[disk_name][stage][category] = [] self.window_value[disk_name][stage][category] = [[0,0,0], [0,0,0]] return major_version in EBPF_SUPPORT_VERSION and os.path.exists('/usr/bin/ebpf_collector') and len(IO_GLOBAL_DATA) != 0 @@ -311,6 +388,8 @@ class CollectIo(): return if (len(IO_GLOBAL_DATA[disk_name][stage][io_type])) >= self.max_save: IO_GLOBAL_DATA[disk_name][stage][io_type].pop() + if (len(IO_DUMP_DATA[disk_name][stage][io_type])) >= self.max_save: + IO_DUMP_DATA[disk_name][stage][io_type].pop() curr_finish_count, curr_latency, curr_io_dump_count = self.window_value[disk_name][stage][io_type][-1] prev_finish_count, prev_latency, prev_io_dump_count = self.window_value[disk_name][stage][io_type][-2] self.window_value[disk_name][stage][io_type].pop(0) @@ -322,6 +401,7 @@ class CollectIo(): if curr_io_dump > 0: logging.info(f"ebpf io_dump info : {disk_name}, {stage}, {io_type}, {curr_io_dump}") IO_GLOBAL_DATA[disk_name][stage][io_type].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) + IO_DUMP_DATA[disk_name][stage][io_type].insert(0, []) elapsed_time = time.time() - start_time sleep_time = self.period_time - elapsed_time @@ -419,6 +499,7 @@ class CollectIo(): def main_loop(self): global IO_GLOBAL_DATA + global IO_DUMP_DATA logging.info("collect io thread start") if self.is_kernel_avaliable() and len(self.disk_map_stage) != 0: @@ -426,8 +507,11 @@ class CollectIo(): for stage in stage_list: self.window_value[disk_name][stage] = [] IO_GLOBAL_DATA[disk_name][stage] = {} + IO_DUMP_DATA[disk_name][stage] = {} for category in Io_Category: IO_GLOBAL_DATA[disk_name][stage][category] = [] + IO_DUMP_DATA[disk_name][stage][category] = [] + self.update_io_threshold(disk_name, stage_list) while True: start_time = time.time() diff --git a/src/services/sentryCollector/collect_server.py b/src/services/sentryCollector/collect_server.py index ad3ac0e3b6ca09b5be3a5fdbdda9e0cad034cc13..b045d4c3c6407cefb41976dd4e46edfb460f0f40 100644 --- a/src/services/sentryCollector/collect_server.py +++ b/src/services/sentryCollector/collect_server.py @@ -24,7 +24,7 @@ import select import threading import time -from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA +from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA from .collect_config import CollectConfig SENTRY_RUN_DIR = "/var/run/sysSentry" @@ -48,6 +48,7 @@ RES_MAGIC = "RES" class ServerProtocol(): IS_IOCOLLECT_VALID = 0 GET_IO_DATA = 1 + GET_IODUMP_DATA = 2 PRO_END = 3 class CollectServer(): @@ -58,78 +59,86 @@ class CollectServer(): self.stop_event = threading.Event() - def is_iocollect_valid(self, data_struct): - + @staticmethod + def get_io_common(data_struct, data_source): result_rev = {} - self.io_global_data = IO_GLOBAL_DATA if len(IO_CONFIG_DATA) == 0: logging.error("the collect thread is not started, the data is invalid.") return json.dumps(result_rev) - period_time = IO_CONFIG_DATA[0] max_save = IO_CONFIG_DATA[1] - disk_list = json.loads(data_struct['disk_list']) period = int(data_struct['period']) + disk_list = json.loads(data_struct['disk_list']) stage_list = json.loads(data_struct['stage']) + iotype_list = json.loads(data_struct['iotype']) if (period < period_time) or (period > period_time * max_save) or (period % period_time): - logging.error("is_iocollect_valid: period time is invalid, user period: %d, config period_time: %d", period, period_time) + logging.error("get_io_common: period time is invalid, user period: %d, config period_time: %d", + period, period_time) return json.dumps(result_rev) - for disk_name, stage_info in self.io_global_data.items(): - if len(disk_list) > 0 and disk_name not in disk_list: - continue - result_rev[disk_name] = [] - if len(stage_list) == 0: - result_rev[disk_name] = list(stage_info.keys()) + collect_index = period // period_time - 1 + logging.debug("user period: %d, config period_time: %d, collect_index: %d", period, period_time, collect_index) + + for disk_name, stage_info in data_source.items(): + if disk_name not in disk_list: continue - for stage_name, stage_data in stage_info.items(): - if stage_name in stage_list: - result_rev[disk_name].append(stage_name) + result_rev[disk_name] = {} + for stage_name, iotype_info in stage_info.items(): + if len(stage_list) > 0 and stage_name not in stage_list: + continue + result_rev[disk_name][stage_name] = {} + for iotype_name, iotype_data in iotype_info.items(): + if iotype_name not in iotype_list: + continue + if len(iotype_data) - 1 < collect_index: + continue + result_rev[disk_name][stage_name][iotype_name] = iotype_data[collect_index] return json.dumps(result_rev) - def get_io_data(self, data_struct): + def is_iocollect_valid(self, data_struct): + result_rev = {} self.io_global_data = IO_GLOBAL_DATA if len(IO_CONFIG_DATA) == 0: logging.error("the collect thread is not started, the data is invalid.") return json.dumps(result_rev) + period_time = IO_CONFIG_DATA[0] max_save = IO_CONFIG_DATA[1] - period = int(data_struct['period']) disk_list = json.loads(data_struct['disk_list']) + period = int(data_struct['period']) stage_list = json.loads(data_struct['stage']) - iotype_list = json.loads(data_struct['iotype']) if (period < period_time) or (period > period_time * max_save) or (period % period_time): - logging.error("get_io_data: period time is invalid, user period: %d, config period_time: %d", period, period_time) + logging.error("is_iocollect_valid: period time is invalid, user period: %d, config period_time: %d", period, period_time) return json.dumps(result_rev) - collect_index = period // period_time - 1 - logging.debug("user period: %d, config period_time: %d, collect_index: %d", period, period_time, collect_index) - for disk_name, stage_info in self.io_global_data.items(): - if disk_name not in disk_list: + if len(disk_list) > 0 and disk_name not in disk_list: continue - result_rev[disk_name] = {} - for stage_name, iotype_info in stage_info.items(): - if len(stage_list) > 0 and stage_name not in stage_list: - continue - result_rev[disk_name][stage_name] = {} - for iotype_name, iotype_info in iotype_info.items(): - if iotype_name not in iotype_list: - continue - if len(iotype_info) - 1 < collect_index: - continue - result_rev[disk_name][stage_name][iotype_name] = iotype_info[collect_index] + result_rev[disk_name] = [] + if len(stage_list) == 0: + result_rev[disk_name] = list(stage_info.keys()) + continue + for stage_name, stage_data in stage_info.items(): + if stage_name in stage_list: + result_rev[disk_name].append(stage_name) return json.dumps(result_rev) + def get_io_data(self, data_struct): + self.io_global_data = IO_GLOBAL_DATA + return self.get_io_common(data_struct, self.io_global_data) + + def get_iodump_data(self, data_struct): + return self.get_io_common(data_struct, IO_DUMP_DATA) + def msg_data_process(self, msg_data, protocal_id): """message data process""" logging.debug("msg_data %s", msg_data) @@ -144,6 +153,8 @@ class CollectServer(): res_msg = self.is_iocollect_valid(data_struct) elif protocal_id == ServerProtocol.GET_IO_DATA: res_msg = self.get_io_data(data_struct) + elif protocal_id == ServerProtocol.GET_IODUMP_DATA: + res_msg = self.get_iodump_data(data_struct) return res_msg