diff --git a/add-disk-latency-collect.patch b/add-disk-latency-collect.patch new file mode 100644 index 0000000000000000000000000000000000000000..ddae3c00b1babe3e7a88f5e0442866e711f5c4f2 --- /dev/null +++ b/add-disk-latency-collect.patch @@ -0,0 +1,995 @@ +From f9c9d6f9e2d922282f6181888c829914739009d6 Mon Sep 17 00:00:00 2001 +From: hewanhan +Date: Mon, 17 Nov 2025 14:43:14 +0800 +Subject: [PATCH] add disk latency collect. + +--- + .../pySentryCollector/collect_plugin.py | 11 +- + src/sentryPlugins/ai_block_io/ai_block_io.py | 16 +- + src/sentryPlugins/ai_block_io/data_access.py | 46 +++- + src/sentryPlugins/ai_block_io/detector.py | 20 +- + src/sentryPlugins/ai_block_io/extra_logger.py | 31 ++- + src/sentryPlugins/ai_block_io/io_data.py | 27 +++ + src/sentryPlugins/ai_block_io/utils.py | 10 +- + .../avg_block_io/avg_block_io.py | 20 +- + src/sentryPlugins/avg_block_io/config.py | 2 +- + .../avg_block_io/extra_logger.py | 32 ++- + src/sentryPlugins/avg_block_io/module_conn.py | 15 +- + .../avg_block_io/stage_window.py | 2 +- + src/sentryPlugins/avg_block_io/utils.py | 21 +- + src/services/sentryCollector/collect_disk.py | 215 ++++++++++++++++++ + src/services/sentryCollector/collect_io.py | 54 +++++ + .../sentryCollector/collect_server.py | 13 +- + 16 files changed, 498 insertions(+), 37 deletions(-) + create mode 100644 src/services/sentryCollector/collect_disk.py + +diff --git a/src/libsentry/python/pySentryCollector/collect_plugin.py b/src/libsentry/python/pySentryCollector/collect_plugin.py +index e1befe7..7c2d688 100644 +--- a/src/libsentry/python/pySentryCollector/collect_plugin.py ++++ b/src/libsentry/python/pySentryCollector/collect_plugin.py +@@ -53,7 +53,8 @@ class ClientProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 + GET_IODUMP_DATA = 2 +- PRO_END = 3 ++ GET_DISK_DATA = 3 ++ PRO_END = 4 + + class ResultMessage(): + RESULT_SUCCEED = 0 +@@ -303,6 +304,14 @@ def get_iodump_data(period, disk_list, stage, iotype): + return result + + ++def get_disk_data(period, disk_list, stage, iotype): ++ result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_DISK_DATA) ++ error_code = result['ret'] ++ if error_code != ResultMessage.RESULT_SUCCEED: ++ result['message'] = Result_Messages[error_code] ++ return result ++ ++ + def get_disk_type(disk): + result = {} + result['ret'] = ResultMessage.RESULT_UNKNOWN +diff --git a/src/sentryPlugins/ai_block_io/ai_block_io.py b/src/sentryPlugins/ai_block_io/ai_block_io.py +index 2973d52..bda96fa 100644 +--- a/src/sentryPlugins/ai_block_io/ai_block_io.py ++++ b/src/sentryPlugins/ai_block_io/ai_block_io.py +@@ -22,6 +22,7 @@ from .config_parser import ConfigParser + from .data_access import ( + get_io_data_from_collect_plug, + get_iodump_data_from_collect_plug, ++ get_disk_data_from_collect_plug, + check_collect_valid, + get_disk_type, + check_disk_is_available +@@ -97,6 +98,7 @@ class SlowIODetection: + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iops")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iodump_data")) ++ self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "disk_data")) + + if not self._detector_name_list: + Report.report_pass("the disks to detection is empty, ai_block_io will exit.") +@@ -184,6 +186,11 @@ class SlowIODetection: + data_detector = DataDetector(metric_name, data_window) + disk_detector.add_data_detector(data_detector) + ++ elif metric_name.metric_name == 'disk_data': ++ data_window = DataWindow(window_size) ++ data_detector = DataDetector(metric_name, data_window) ++ disk_detector.add_data_detector(data_detector) ++ + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") + self._disk_detectors[disk] = disk_detector + +@@ -198,6 +205,9 @@ class SlowIODetection: + iodump_data_dict_with_disk_name = get_iodump_data_from_collect_plug( + self._config_parser.period_time, self._disk_list + ) ++ io_disk_data_dict_with_disk_name = get_disk_data_from_collect_plug( ++ self._config_parser.period_time, self._disk_list ++ ) + logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") + if io_data_dict_with_disk_name is None: + Report.report_pass( +@@ -210,10 +220,13 @@ class SlowIODetection: + slow_io_event_list = [] + for disk, disk_detector in self._disk_detectors.items(): + disk_detector.push_data_to_data_detectors(iodump_data_dict_with_disk_name) ++ disk_detector.push_data_to_data_detectors(io_disk_data_dict_with_disk_name) + result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0]: + # 产生告警时获取iodump的详细数据 +- result[6]["iodump_data"] = disk_detector.get_data_detector_list_window() ++ win_data_wins = disk_detector.get_data_detector_list_window() ++ result[6]["iodump_data"] = win_data_wins['iodump_data'] ++ result[6]["disk_data"] = win_data_wins['disk_data'] + slow_io_event_list.append(result) + + logging.debug("step2. End to detection slow io event.") +@@ -240,6 +253,7 @@ class SlowIODetection: + logging.warning(f"iops: " + str(alarm_content.get("details").get("iops"))) + extra_slow_log(alarm_content) + del alarm_content["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 ++ del alarm_content["details"]["disk_data"] + Xalarm.major(alarm_content) + + # Step4:等待检测时间 +diff --git a/src/sentryPlugins/ai_block_io/data_access.py b/src/sentryPlugins/ai_block_io/data_access.py +index f1c2bc2..76578a5 100644 +--- a/src/sentryPlugins/ai_block_io/data_access.py ++++ b/src/sentryPlugins/ai_block_io/data_access.py +@@ -16,12 +16,13 @@ from sentryCollector.collect_plugin import ( + Result_Messages, + get_io_data, + get_iodump_data, ++ get_disk_data, + is_iocollect_valid, + get_disk_type + ) + + +-from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData ++from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData, IOStageDiskData, IODiskData + + COLLECT_STAGES = [ + "throtl", +@@ -169,4 +170,47 @@ def get_iodump_data_from_collect_plug(period, disk_list): + ret[disk] = disk_ret + return ret + logging.warning(f'get iodump data failed with message: {data_raw["message"]}') ++ return None ++ ++ ++def _get_raw_disk_data(period, disk_list): ++ return get_disk_data( ++ period, ++ disk_list, ++ COLLECT_STAGES, ++ ["read", "write"], ++ ) ++ ++ ++def _get_disk_stage_data(data): ++ io_stage_data = IOStageDiskData() ++ for data_type in ("read", "write"): ++ if data_type in data: ++ getattr(io_stage_data, data_type).disk_data = data[data_type] ++ return io_stage_data ++ ++ ++def get_disk_data_from_collect_plug(period, disk_list): ++ data_raw = _get_raw_disk_data(period, disk_list) ++ if data_raw["ret"] == 0: ++ ret = {} ++ try: ++ data = json.loads(data_raw["message"]) ++ except json.decoder.JSONDecodeError as e: ++ logging.warning(f"get disk data failed, {e}") ++ return None ++ ++ for disk in data: ++ disk_data = data[disk] ++ disk_ret = IODiskData() ++ for k, v in disk_data.items(): ++ try: ++ getattr(disk_ret, k) ++ setattr(disk_ret, k, _get_disk_stage_data(v)) ++ except AttributeError: ++ logging.debug(f"no attr {k}") ++ continue ++ ret[disk] = disk_ret ++ return ret ++ logging.warning(f'get disk data failed with message: {data_raw["message"]}') + return None +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/detector.py b/src/sentryPlugins/ai_block_io/detector.py +index 6c0a03f..96939f5 100644 +--- a/src/sentryPlugins/ai_block_io/detector.py ++++ b/src/sentryPlugins/ai_block_io/detector.py +@@ -14,7 +14,7 @@ from datetime import datetime + from .io_data import MetricName + from .threshold import Threshold + from .sliding_window import SlidingWindow, DataWindow +-from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_iodump_data_dict ++from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_gen_data_dict + + + class Detector: +@@ -92,11 +92,10 @@ class DataDetector: + def get_data_window_data(self): + return self._data_window.get_data() + +- def push_data(self, iodump_data_dict_with_disk_name: dict): ++ def push_data(self, io_gen_data_dict_with_disk_name: dict): + logging.debug(f'enter Detector: {self}') +- metric_value = get_metric_value_from_iodump_data_dict(iodump_data_dict_with_disk_name, self._metric_name) ++ metric_value = get_metric_value_from_gen_data_dict(io_gen_data_dict_with_disk_name, self._metric_name) + if metric_value is None: +- logging.debug('not found metric value, so return None.') + return False + logging.debug(f'input metric value: {str(metric_value)}') + self._data_window.push(metric_value) +@@ -151,12 +150,19 @@ class DiskDetector: + return latency_wins, iodump_wins, iops_wins + + def get_data_detector_list_window(self): ++ win_data_wins = {} + iodump_data_wins = {"read": {}, "write": {}} ++ disk_data_wins = {"read": {}, "write": {}} + for data_detector in self._data_detector_list: + if data_detector.metric_name.metric_name == 'iodump_data': + iodump_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ + data_detector.get_data_window_data() +- return iodump_data_wins ++ if data_detector.metric_name.metric_name == 'disk_data': ++ disk_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ ++ data_detector.get_data_window_data() ++ win_data_wins['iodump_data'] = iodump_data_wins ++ win_data_wins['disk_data'] = disk_data_wins ++ return win_data_wins + + def is_slow_io_event(self, io_data_dict_with_disk_name: dict): + diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} +@@ -203,6 +209,6 @@ class DiskDetector: + + return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details + +- def push_data_to_data_detectors(self, iodump_data_dict_with_disk_name: dict): ++ def push_data_to_data_detectors(self, io_gen_data_dict_with_disk_name: dict): + for data_detector in self._data_detector_list: +- data_detector.push_data(iodump_data_dict_with_disk_name) ++ data_detector.push_data(io_gen_data_dict_with_disk_name) +diff --git a/src/sentryPlugins/ai_block_io/extra_logger.py b/src/sentryPlugins/ai_block_io/extra_logger.py +index cfd1929..607f55a 100644 +--- a/src/sentryPlugins/ai_block_io/extra_logger.py ++++ b/src/sentryPlugins/ai_block_io/extra_logger.py +@@ -148,6 +148,7 @@ def extra_latency_log(msg): + ) + except KeyError: + return ++ extra_disk_log(msg, io_type) + + + def extra_iodump_log(msg): +@@ -183,4 +184,32 @@ def extra_iodump_log(msg): + for bio_ptr in last_bio_record: + task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] + line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" +- extra_logger.warning(line) +\ No newline at end of file ++ extra_logger.warning(line) ++ ++ ++def extra_disk_log(msg, io_type): ++ disk_data = msg['details']['disk_data'].get(io_type, {}) ++ ++ try: ++ rq_driver_data = disk_data['rq_driver'] ++ except Exception as e: ++ extra_logger.error(f"Failed to parse disk data: {e}") ++ return ++ ++ if len(rq_driver_data) == 0: ++ return ++ ++ extra_logger.warning(f"disk latency:") ++ header = f"{'0-1ms':>12} {'1-10ms':>12} {'10-100ms':>15} {'100ms-1s':>15} {'1-3s':>12} {'>3s':>12}" ++ extra_logger.warning(header) ++ ++ total_data = [0] * 6 ++ for period_data in rq_driver_data: ++ for i in range(6): ++ total_data[i] += period_data[i] ++ num_periods = len(rq_driver_data) ++ avg_data = [total // num_periods for total in total_data] ++ extra_logger.warning( ++ f"{avg_data[0]:>12} {avg_data[1]:>12} {avg_data[2]:>15}" ++ f"{avg_data[3]:>15} {avg_data[4]:>12} {avg_data[5]:>12}" ++ ) +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/io_data.py b/src/sentryPlugins/ai_block_io/io_data.py +index 023e7b1..6f9347f 100644 +--- a/src/sentryPlugins/ai_block_io/io_data.py ++++ b/src/sentryPlugins/ai_block_io/io_data.py +@@ -75,6 +75,33 @@ class IODumpData: + time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + ++@dataclass ++class IODiskListData: ++ disk_data: List[int] = field(default_factory=list) ++ ++ ++@dataclass ++class IOStageDiskData: ++ read: IODiskListData = field(default_factory=lambda: IODiskListData()) ++ write: IODiskListData = field(default_factory=lambda: IODiskListData()) ++ ++ ++@dataclass ++class IODiskData: ++ throtl: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ wbt: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ gettag: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ iocost: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ plug: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ bfq: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ hctx: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ requeue: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ rq_driver: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ bio: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ deadline: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) ++ time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) ++ ++ + @dataclass(frozen=True) + class MetricName: + disk_name: str +diff --git a/src/sentryPlugins/ai_block_io/utils.py b/src/sentryPlugins/ai_block_io/utils.py +index 919cf9b..ae63f5f 100644 +--- a/src/sentryPlugins/ai_block_io/utils.py ++++ b/src/sentryPlugins/ai_block_io/utils.py +@@ -15,7 +15,7 @@ from dataclasses import asdict + + from .threshold import ThresholdType + from .sliding_window import SlidingWindowType +-from .io_data import MetricName, IOData, IODumpData ++from .io_data import MetricName, IOData + + + def get_threshold_type_enum(algorithm_type: str): +@@ -49,11 +49,11 @@ def get_metric_value_from_io_data_dict_by_metric_name( + return None + + +-def get_metric_value_from_iodump_data_dict(io_dump_data_dict: dict, metric_name: MetricName): ++def get_metric_value_from_gen_data_dict(io_gen_data_dict: dict, metric_name: MetricName): + try: +- io_dump_data: IODumpData = io_dump_data_dict[metric_name.disk_name] +- io_dump_stage_data = asdict(io_dump_data)[metric_name.stage_name] +- base_data = io_dump_stage_data[metric_name.io_access_type_name] ++ io_gen_data = io_gen_data_dict[metric_name.disk_name] ++ io_gen_stage_data = asdict(io_gen_data)[metric_name.stage_name] ++ base_data = io_gen_stage_data[metric_name.io_access_type_name] + metric_value = base_data[metric_name.metric_name] + return metric_value + except KeyError: +diff --git a/src/sentryPlugins/avg_block_io/avg_block_io.py b/src/sentryPlugins/avg_block_io/avg_block_io.py +index ef19b7b..42dc7d0 100644 +--- a/src/sentryPlugins/avg_block_io/avg_block_io.py ++++ b/src/sentryPlugins/avg_block_io/avg_block_io.py +@@ -14,9 +14,10 @@ import configparser + import time + + from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage +-from .stage_window import IoWindow, IoDumpWindow,IopsWindow,IodumpMsgWindow +-from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation +-from .utils import update_avg_and_check_abnormal, update_avg_iodump_data ++from .stage_window import IoWindow, IoDumpWindow, IopsWindow, IoArrayDataWindow ++from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, avg_get_disk_data, \ ++ report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation ++from .utils import update_avg_and_check_abnormal, update_avg_array_data + from .extra_logger import init_extra_logger + + CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" +@@ -69,8 +70,11 @@ def init_io_win(io_dic, config, common_param): + io_data[disk_name][stage_name][rw]["iops"] = IopsWindow(window_size=io_dic["win_size"]) + logging.debug("Successfully create {}-{}-{}-iops window".format(disk_name, stage_name, rw)) + +- io_data[disk_name][stage_name][rw]["iodump_data"] = IodumpMsgWindow(window_size=io_dic["win_size"]) ++ io_data[disk_name][stage_name][rw]["iodump_data"] = IoArrayDataWindow(window_size=io_dic["win_size"]) + logging.debug("Successfully create {}-{}-{}-iodump_data window".format(disk_name, stage_name, rw)) ++ ++ io_data[disk_name][stage_name][rw]["disk_data"] = IoArrayDataWindow(window_size=io_dic["win_size"]) ++ logging.debug("Successfully create {}-{}-{}-disk_data window".format(disk_name, stage_name, rw)) + return io_data, io_avg_value + + +@@ -138,7 +142,10 @@ def main_loop(io_dic, io_data, io_avg_value): + continue + + # 获取iodump的详细信息 +- is_success, iodump_data = avg_get_iodump_data(io_dic) ++ is_success_iodump, iodump_data = avg_get_iodump_data(io_dic) ++ ++ # 获取磁盘的时延数据 ++ is_success_disk, disk_data = avg_get_disk_data(io_dic) + + # 处理周期数据 + reach_size = False +@@ -148,7 +155,8 @@ def main_loop(io_dic, io_data, io_avg_value): + if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: + io_key = (disk_name, stage_name, rw) + reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) +- update_avg_iodump_data(iodump_data, is_success, io_key, io_data) ++ update_avg_array_data(iodump_data, is_success_iodump, io_key, io_data, "iodump_data") ++ update_avg_array_data(disk_data, is_success_disk, io_key, io_data, "disk_data") + + # win_size不满时不进行告警判断 + if not reach_size: +diff --git a/src/sentryPlugins/avg_block_io/config.py b/src/sentryPlugins/avg_block_io/config.py +index 79bd21a..fe24468 100644 +--- a/src/sentryPlugins/avg_block_io/config.py ++++ b/src/sentryPlugins/avg_block_io/config.py +@@ -133,7 +133,7 @@ def read_config_common(config): + + except configparser.NoOptionError: + iotype_list = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_IOTYPE] +- logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotupe_list} as default") ++ logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotype_list} as default") + + try: + period_time = int(config.get(CONF_COMMON, CONF_COMMON_PER_TIME)) +diff --git a/src/sentryPlugins/avg_block_io/extra_logger.py b/src/sentryPlugins/avg_block_io/extra_logger.py +index ac86306..cd8a8d8 100644 +--- a/src/sentryPlugins/avg_block_io/extra_logger.py ++++ b/src/sentryPlugins/avg_block_io/extra_logger.py +@@ -164,6 +164,8 @@ def extra_latency_log(msg): + except KeyError: + return + ++ extra_disk_log(msg) ++ + + def extra_iodump_log(msg): + extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{msg['io_type']}") +@@ -196,4 +198,32 @@ def extra_iodump_log(msg): + for bio_ptr in last_bio_record: + task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] + line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" +- extra_logger.warning(line) +\ No newline at end of file ++ extra_logger.warning(line) ++ ++ ++def extra_disk_log(msg): ++ disk_str = msg['details']['disk_data'] ++ try: ++ disk_data = ast.literal_eval(disk_str) ++ rq_driver_data = disk_data['rq_driver'] ++ except Exception as e: ++ extra_logger.error(f"Failed to parse disk data: {e}") ++ return ++ ++ if not rq_driver_data[0]: ++ return ++ ++ extra_logger.warning(f"disk latency:") ++ header = f"{'0-1ms':>12} {'1-10ms':>12} {'10-100ms':>15} {'100ms-1s':>15} {'1-3s':>12} {'>3s':>12}" ++ extra_logger.warning(header) ++ ++ total_data = [0] * 6 ++ for period_data in rq_driver_data: ++ for i in range(6): ++ total_data[i] += period_data[i] ++ num_periods = len(rq_driver_data) ++ avg_data = [total // num_periods for total in total_data] ++ extra_logger.warning( ++ f"{avg_data[0]:>12} {avg_data[1]:>12} {avg_data[2]:>15}" ++ f"{avg_data[3]:>15} {avg_data[4]:>12} {avg_data[5]:>12}" ++ ) +\ No newline at end of file +diff --git a/src/sentryPlugins/avg_block_io/module_conn.py b/src/sentryPlugins/avg_block_io/module_conn.py +index 7e9b065..60f24a6 100644 +--- a/src/sentryPlugins/avg_block_io/module_conn.py ++++ b/src/sentryPlugins/avg_block_io/module_conn.py +@@ -12,7 +12,8 @@ import json + import logging + import sys + +-from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, Result_Messages, get_disk_type, Disk_Type ++from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, get_disk_data, \ ++ Result_Messages, get_disk_type, Disk_Type + from syssentry.result import ResultLevel, report_result + from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR + from .utils import is_abnormal, get_win_data, log_slow_win +@@ -42,6 +43,14 @@ def avg_get_iodump_data(io_dic): + return check_result_validation(res, 'get io dump data') + + ++def avg_get_disk_data(io_dic): ++ """avg_get_disk_data from sentryCollector""" ++ logging.debug(f"send to sentryCollector avg_get_disk_data: period={io_dic['period_time']}, " ++ f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") ++ res = get_disk_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) ++ return check_result_validation(res, 'get disk data') ++ ++ + def avg_is_iocollect_valid(io_dic, config_disk, config_stage): + """is_iocollect_valid from sentryCollector""" + logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " +@@ -98,6 +107,7 @@ def process_report_data(disk_name, rw, io_data): + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "IO press") + del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ ++ del msg["details"]["disk_data"] + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + +@@ -109,6 +119,7 @@ def process_report_data(disk_name, rw, io_data): + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "driver slow") + del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ ++ del msg["details"]["disk_data"] + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + +@@ -123,11 +134,13 @@ def process_report_data(disk_name, rw, io_data): + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "kernel slow") + del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ ++ del msg["details"]["disk_data"] + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + + log_slow_win(msg, "unknown") + del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ ++ del msg["details"]["disk_data"] + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + + +diff --git a/src/sentryPlugins/avg_block_io/stage_window.py b/src/sentryPlugins/avg_block_io/stage_window.py +index 29fa6e1..88204a2 100644 +--- a/src/sentryPlugins/avg_block_io/stage_window.py ++++ b/src/sentryPlugins/avg_block_io/stage_window.py +@@ -60,7 +60,7 @@ class IopsWindow(AbnormalWindowBase): + return False + + +-class IodumpMsgWindow: ++class IoArrayDataWindow: + def __init__(self, window_size=10): + self.window_size = window_size + self.window_data = [[] for _ in range(window_size)] +diff --git a/src/sentryPlugins/avg_block_io/utils.py b/src/sentryPlugins/avg_block_io/utils.py +index d9af7fe..9726641 100644 +--- a/src/sentryPlugins/avg_block_io/utils.py ++++ b/src/sentryPlugins/avg_block_io/utils.py +@@ -42,6 +42,7 @@ def get_win_data(disk_name, rw, io_data): + iodump = '' + iops = '' + iodump_data = '' ++ disk_data = '' + for stage_name in io_data[disk_name]: + if 'latency' in io_data[disk_name][stage_name][rw]: + latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() +@@ -55,9 +56,15 @@ def get_win_data(disk_name, rw, io_data): + if 'iodump_data' in io_data[disk_name][stage_name][rw]: + iodump_data_list = io_data[disk_name][stage_name][rw]['iodump_data'].window_data_to_string() + iodump_data += f'"{stage_name}": {iodump_data_list}, ' ++ if 'disk_data' in io_data[disk_name][stage_name][rw]: ++ disk_data_list = io_data[disk_name][stage_name][rw]['disk_data'].window_data_to_string() ++ disk_data += f'"{stage_name}": {disk_data_list}, ' + if iodump_data: + iodump_data = '{' + iodump_data[:-2] + '}' +- return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], "iodump_data": iodump_data} ++ if disk_name: ++ disk_data = '{' + disk_data[:-2] + '}' ++ return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], \ ++ "iodump_data": iodump_data, "disk_data": disk_data} + + + def is_abnormal(io_key, io_data): +@@ -154,13 +161,13 @@ def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data) + return True + + +-def update_avg_iodump_data(iodump_data, is_success, io_key, io_data): +- """update iodump data to io_data""" ++def update_avg_array_data(array_data, is_success, io_key, io_data, data_type): ++ """update array data to io_data""" + all_wins = get_nested_value(io_data, io_key) +- if all_wins and "iodump_data" in all_wins: ++ if all_wins and data_type in all_wins: + if not is_success: +- io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data([]) ++ io_data[io_key[0]][io_key[1]][io_key[2]][data_type].append_new_data([]) + else: +- period_value = get_nested_value(iodump_data, io_key) +- io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data(period_value) ++ period_value = get_nested_value(array_data, io_key) ++ io_data[io_key[0]][io_key[1]][io_key[2]][data_type].append_new_data(period_value) + +diff --git a/src/services/sentryCollector/collect_disk.py b/src/services/sentryCollector/collect_disk.py +new file mode 100644 +index 0000000..03eda85 +--- /dev/null ++++ b/src/services/sentryCollector/collect_disk.py +@@ -0,0 +1,215 @@ ++# coding: utf-8 ++# Copyright (c) 2025 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++""" ++Get disk latency distribution data. ++""" ++import struct ++import logging ++from typing import List, Optional, Dict, Any ++from .collect_plugin import get_disk_type ++from syssentry.utils import execute_command ++ ++ ++class CollectDisk: ++ """ ++ 硬盘时延分布数据采集,目前仅支持华为V6和V7代nvme盘,且接口协议版本应为1,带内查询接口如下: ++ nvme get-log -i 0xC2 -l 784 /dev/[nvme_disk] ++ 正常调用时,响应格式为: ++ Device:nvme0n1 log-id:xxx namespace-id:xxx ++ 0 1 2 3 4 5 6 7 8 9 a b c d e f ++ 0000 01 00 00 00 00 00 00 00 72 00 00 00 18 00 00 00 ++ 0010 53 3e 00 00 b7 c9 00 00 fc 0c 01 00 42 f9 02 00 ++ 0020 b6 fe 03 00 95 04 04 00 14 05 04 00 38 f2 03 00 ++ ... ++ 0300 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ++ 响应部分各字段含义如下: ++ Byte 类型 描述 ++ 03:00 version major version ++ 07:04 version minor version ++ 135:08 读时延分布 时延为0-1ms的读命令个数,每32us一个档位,共32个档位,共128bytes,每个档位统计占用4byte ++ 259:136 读时延分布 时延为1-32ms的读命令个数,每1ms一个档位,共31个档位,共124bytes,每个档位统计占用4byte ++ 379:260 读时延分布 时延为32ms-1s的读命令个数,每32ms一个档位,共30个档位,共120bytes,每个档位统计占用4byte ++ 383:380 读时延分布 时延为1-2s的读命令个数,只有一个档位,统计个数占用4byte ++ 387:384 读时延分布 时延为2-3s的读命令个数,只有一个档位,统计个数占用4byte ++ 391:388 读时延分布 时延为3-4s的读命令个数,只有一个档位,统计个数占用4byte ++ 395:392 读时延分布 时延为大于4s的读命令个数,只有一个档位,统计个数占用4byte ++ 523:296 写时延分布 时延为0-1ms的写命令个数,每32us一个档位,共32个档位,共128bytes,每个档位统计占用4byte ++ 647:523 写时延分布 时延为1-32ms的写命令个数,每1ms一个档位,共31个档位,共124bytes,每个档位统计占用4byte ++ 767:648 写时延分布 时延为32ms-1s的写命令个数,每32ms一个档位,共30个档位,共120bytes,每个档位统计占用4byte ++ 771:768 写时延分布 时延为1-2s的写命令个数,只有一个档位,统计个数占用4byte ++ 775:772 写时延分布 时延为2-3s的写命令个数,只有一个档位,统计个数占用4byte ++ 779:776 写时延分布 时延为3-4s的写命令个数,只有一个档位,统计个数占用4byte ++ 783:780 写时延分布 时延为大于4s的写命令个数,只有一个档位,统计个数占用4byte ++ 最终返回的数据会合并一下,一共12个数据,分别为0-1ms,1-10ms,10-100ms,100-1s,1-3s,大于3s这6个档位的读写时延 ++ """ ++ ++ def __init__(self, disk_name: str): ++ self.disk_name = disk_name ++ self.is_support = False ++ self._check_support() ++ ++ def get_support_flag(self) -> bool: ++ return self.is_support ++ ++ def collect_data(self) -> List[int]: ++ if not self.is_support: ++ logging.error(f"Disk {self.disk_name} is not supported for latency collection.") ++ return [] ++ ++ try: ++ cmd = ["nvme", "get-log", "-i", "0xC2", "-l", "784", f"/dev/{self.disk_name}"] ++ output = execute_command(cmd) ++ if not output: ++ logging.error(f"Failed to get NVMe log for disk {self.disk_name}.") ++ return [] ++ ++ result = self._parse_nvme_output(output) ++ return result ++ except Exception as e: ++ logging.error(f"Error collecting latency data for disk {self.disk_name}: {e}") ++ return [] ++ ++ def _check_support(self): ++ try: ++ disk_type_result = get_disk_type(self.disk_name) ++ if disk_type_result["ret"] != 0 or disk_type_result["message"] != '0': ++ logging.warning(f"Disk {self.disk_name} type is not supported.") ++ return ++ ++ if not self._check_disk_model(): ++ logging.warning(f"Disk {self.disk_name} model is not supported.") ++ return ++ ++ if not self._check_nvme_version(): ++ logging.warning(f"Disk {self.disk_name} NVMe version is not supported.") ++ return ++ ++ self.is_support = True ++ logging.info(f"Disk {self.disk_name} is supported for latency collection.") ++ except Exception as e: ++ logging.error(f"Error checking disk {self.disk_name} support: {e}") ++ ++ def _check_disk_model(self) -> bool: ++ cmd = ["lsblk", "-o", "name,model"] ++ try: ++ output = execute_command(cmd) ++ if not output: ++ logging.error(f"Failed to get disk model.") ++ return False ++ ++ for line in output.splitlines(): ++ if self.disk_name in line: ++ parts = line.split() ++ if len(parts) >= 2: ++ model = parts[-1] ++ if model.startswith("HWE6") or model.startswith("HWE7"): ++ return True ++ except Exception as e: ++ logging.error(f"Error checking disk model for {self.disk_name}: {e}") ++ return False ++ ++ def _check_nvme_version(self) -> bool: ++ try: ++ cmd = ["nvme", "get-log", "-i", "0xC2", "-l", "784", f"/dev/{self.disk_name}"] ++ output = execute_command(cmd) ++ if not output: ++ logging.error(f"Failed to get NVMe log for disk {self.disk_name}.") ++ return False ++ ++ lines = output.splitlines() ++ line = lines[2] ++ parts = line.split() ++ hex_data = [] ++ if len(parts) < 17: ++ return False ++ for hex_byte in parts[1:9]: ++ hex_data.append(hex_byte) ++ ++ data = bytes.fromhex(''.join(hex_data)) ++ if len(data) < 8: ++ return False ++ ++ major_version = struct.unpack(' List[int]: ++ lines = output.splitlines() ++ hex_data = [] ++ ++ for line in lines[2:]: ++ parts = line.split() ++ for hex_byte in parts[1:17]: ++ hex_data.append(hex_byte) ++ ++ data = bytes.fromhex(''.join(hex_data)) ++ if len(data) < 784: ++ logging.error(f"NVMe log data for disk {self.disk_name} is incomplete.") ++ return [] ++ ++ result = [0] * 12 # 6 read + 6 write latency buckets ++ for i in range(32): ++ offset = 8 + i * 4 ++ count = struct.unpack(' None: + global IO_GLOBAL_DATA ++ global IO_DUMP_DATA + while True: + if self.stop_event.is_set(): + logging.debug("collect io thread exit") +@@ -402,6 +408,7 @@ class CollectIo(): + logging.info(f"ebpf io_dump info : {disk_name}, {stage}, {io_type}, {curr_io_dump}") + IO_GLOBAL_DATA[disk_name][stage][io_type].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) + IO_DUMP_DATA[disk_name][stage][io_type].insert(0, []) ++ self.append_disk_data(disk_name) + + elapsed_time = time.time() - start_time + sleep_time = self.period_time - elapsed_time +@@ -497,6 +504,51 @@ class CollectIo(): + EBPF_PROCESS.wait() + logging.info("ebpf collector thread exit") + ++ def init_disk_collect( ++ self, ++ disk_name: str ++ ) -> None: ++ collector = CollectDisk(disk_name) ++ support_flag = collector.get_support_flag() ++ if support_flag: ++ self.disk_collectors[disk_name] = collector ++ self.disk_data_window_value[disk_name] = [] ++ DISK_DATA[disk_name] = {} ++ DISK_DATA[disk_name]['rq_driver'] = {} ++ DISK_DATA[disk_name]['rq_driver']['read'] = [] ++ DISK_DATA[disk_name]['rq_driver']['write'] = [] ++ ++ def append_disk_data( ++ self, ++ disk_name: str, ++ ) -> None: ++ if disk_name in self.disk_collectors: ++ collect_disk = self.disk_collectors[disk_name] ++ curr_value = collect_disk.collect_data() ++ if len(curr_value) != 0: ++ self.disk_data_window_value[disk_name].append(curr_value) ++ if len(self.disk_data_window_value[disk_name]) < 2: ++ return ++ if len(self.disk_data_window_value[disk_name]) > 2: ++ self.disk_data_window_value[disk_name].pop(0) ++ ++ read_data = [] ++ write_data = [] ++ last_value = self.disk_data_window_value[disk_name][0] ++ ++ for i in range(6): ++ delta = curr_value[i] - last_value[i] ++ read_data.append(max(0, delta)) ++ for i in range(6, 12): ++ delta = curr_value[i] - last_value[i] ++ write_data.append(max(0, delta)) ++ if len(DISK_DATA[disk_name]['rq_driver']['read']) >= self.max_save: ++ DISK_DATA[disk_name]['rq_driver']['read'].pop() ++ if len(DISK_DATA[disk_name]['rq_driver']['write']) >= self.max_save: ++ DISK_DATA[disk_name]['rq_driver']['write'].pop() ++ DISK_DATA[disk_name]['rq_driver']['read'].insert(0, read_data) ++ DISK_DATA[disk_name]['rq_driver']['write'].insert(0, write_data) ++ + def main_loop(self): + global IO_GLOBAL_DATA + global IO_DUMP_DATA +@@ -512,6 +564,7 @@ class CollectIo(): + IO_GLOBAL_DATA[disk_name][stage][category] = [] + IO_DUMP_DATA[disk_name][stage][category] = [] + self.update_io_threshold(disk_name, stage_list) ++ self.init_disk_collect(disk_name) + + while True: + start_time = time.time() +@@ -524,6 +577,7 @@ class CollectIo(): + if self.get_blk_io_hierarchy(disk_name, stage_list) < 0: + continue + self.append_period_lat(disk_name, stage_list) ++ self.append_disk_data(disk_name) + + elapsed_time = time.time() - start_time + sleep_time = self.period_time - elapsed_time +diff --git a/src/services/sentryCollector/collect_server.py b/src/services/sentryCollector/collect_server.py +index b045d4c..9a0c886 100644 +--- a/src/services/sentryCollector/collect_server.py ++++ b/src/services/sentryCollector/collect_server.py +@@ -24,7 +24,7 @@ import select + import threading + import time + +-from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA ++from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA, DISK_DATA + from .collect_config import CollectConfig + + SENTRY_RUN_DIR = "/var/run/sysSentry" +@@ -49,7 +49,8 @@ class ServerProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 + GET_IODUMP_DATA = 2 +- PRO_END = 3 ++ GET_DISK_DATA = 3 ++ PRO_END = 4 + + class CollectServer(): + +@@ -133,12 +134,14 @@ class CollectServer(): + return json.dumps(result_rev) + + def get_io_data(self, data_struct): +- self.io_global_data = IO_GLOBAL_DATA +- return self.get_io_common(data_struct, self.io_global_data) ++ return self.get_io_common(data_struct, IO_GLOBAL_DATA) + + def get_iodump_data(self, data_struct): + return self.get_io_common(data_struct, IO_DUMP_DATA) + ++ def get_disk_data(self, data_struct): ++ return self.get_io_common(data_struct, DISK_DATA) ++ + def msg_data_process(self, msg_data, protocal_id): + """message data process""" + logging.debug("msg_data %s", msg_data) +@@ -155,6 +158,8 @@ class CollectServer(): + res_msg = self.get_io_data(data_struct) + elif protocal_id == ServerProtocol.GET_IODUMP_DATA: + res_msg = self.get_iodump_data(data_struct) ++ elif protocal_id == ServerProtocol.GET_DISK_DATA: ++ res_msg = self.get_disk_data(data_struct) + + return res_msg + +-- +2.48.1 + diff --git a/sysSentry.spec b/sysSentry.spec index 8bc008575d66cb6df8d6f34df89f5453ee4898db..9eb98fb436b1e65caa8be4766148bac35f72dbe4 100644 --- a/sysSentry.spec +++ b/sysSentry.spec @@ -4,7 +4,7 @@ Summary: System Inspection Framework Name: sysSentry Version: 1.0.3 -Release: 13 +Release: 14 License: Mulan PSL v2 Group: System Environment/Daemons Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz @@ -29,6 +29,7 @@ Patch17: Fix-Security-Scan-Warning.patch Patch18: Fix-two-code-review-comments.patch Patch19: Add-MulanV2-License-statement.patch Patch20: add-bmc_block_io-and-slow-io-plugin-upgrade.patch +Patch21: add-disk-latency-collect.patch BuildRequires: cmake gcc-c++ BuildRequires: python3 python3-setuptools @@ -263,6 +264,12 @@ rm -rf /var/run/sysSentry | : %attr(0600,root,root) %config(noreplace) %{_sysconfdir}/sysSentry/tasks/soc_ring_sentry.mod %changelog +* Mon Nov 17 2025 hewanhan - 1.0.3-14 +- Type:feature +- CVE:NA +- SUG:NA +- DESC:add disk latency collect + * Tue Nov 4 2025 hewanhan - 1.0.3-13 - Type:feature - CVE:NA