From a4dfbb5a0f8e3760a7bb23ad3af3a09de78e2eb7 Mon Sep 17 00:00:00 2001 From: hewh Date: Mon, 17 Nov 2025 09:49:07 +0800 Subject: [PATCH] add disk latency collect. --- .../pySentryCollector/collect_plugin.py | 11 +- src/sentryPlugins/ai_block_io/ai_block_io.py | 16 +- src/sentryPlugins/ai_block_io/data_access.py | 46 +++- src/sentryPlugins/ai_block_io/detector.py | 20 +- src/sentryPlugins/ai_block_io/extra_logger.py | 31 ++- src/sentryPlugins/ai_block_io/io_data.py | 27 +++ src/sentryPlugins/ai_block_io/utils.py | 10 +- .../avg_block_io/avg_block_io.py | 20 +- src/sentryPlugins/avg_block_io/config.py | 2 +- .../avg_block_io/extra_logger.py | 32 ++- src/sentryPlugins/avg_block_io/module_conn.py | 15 +- .../avg_block_io/stage_window.py | 2 +- src/sentryPlugins/avg_block_io/utils.py | 21 +- src/services/sentryCollector/collect_disk.py | 215 ++++++++++++++++++ src/services/sentryCollector/collect_io.py | 54 +++++ .../sentryCollector/collect_server.py | 13 +- 16 files changed, 498 insertions(+), 37 deletions(-) create mode 100644 src/services/sentryCollector/collect_disk.py diff --git a/src/libsentry/python/pySentryCollector/collect_plugin.py b/src/libsentry/python/pySentryCollector/collect_plugin.py index e1befe7..7c2d688 100644 --- a/src/libsentry/python/pySentryCollector/collect_plugin.py +++ b/src/libsentry/python/pySentryCollector/collect_plugin.py @@ -53,7 +53,8 @@ class ClientProtocol(): IS_IOCOLLECT_VALID = 0 GET_IO_DATA = 1 GET_IODUMP_DATA = 2 - PRO_END = 3 + GET_DISK_DATA = 3 + PRO_END = 4 class ResultMessage(): RESULT_SUCCEED = 0 @@ -303,6 +304,14 @@ def get_iodump_data(period, disk_list, stage, iotype): return result +def get_disk_data(period, disk_list, stage, iotype): + result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_DISK_DATA) + error_code = result['ret'] + if error_code != ResultMessage.RESULT_SUCCEED: + result['message'] = Result_Messages[error_code] + return result + + def get_disk_type(disk): result = {} result['ret'] = ResultMessage.RESULT_UNKNOWN diff --git a/src/sentryPlugins/ai_block_io/ai_block_io.py b/src/sentryPlugins/ai_block_io/ai_block_io.py index 2973d52..bda96fa 100644 --- a/src/sentryPlugins/ai_block_io/ai_block_io.py +++ b/src/sentryPlugins/ai_block_io/ai_block_io.py @@ -22,6 +22,7 @@ from .config_parser import ConfigParser from .data_access import ( get_io_data_from_collect_plug, get_iodump_data_from_collect_plug, + get_disk_data_from_collect_plug, check_collect_valid, get_disk_type, check_disk_is_available @@ -97,6 +98,7 @@ class SlowIODetection: self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iops")) self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iodump_data")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "disk_data")) if not self._detector_name_list: Report.report_pass("the disks to detection is empty, ai_block_io will exit.") @@ -184,6 +186,11 @@ class SlowIODetection: data_detector = DataDetector(metric_name, data_window) disk_detector.add_data_detector(data_detector) + elif metric_name.metric_name == 'disk_data': + data_window = DataWindow(window_size) + data_detector = DataDetector(metric_name, data_window) + disk_detector.add_data_detector(data_detector) + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") self._disk_detectors[disk] = disk_detector @@ -198,6 +205,9 @@ class SlowIODetection: iodump_data_dict_with_disk_name = get_iodump_data_from_collect_plug( self._config_parser.period_time, self._disk_list ) + io_disk_data_dict_with_disk_name = get_disk_data_from_collect_plug( + self._config_parser.period_time, self._disk_list + ) logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") if io_data_dict_with_disk_name is None: Report.report_pass( @@ -210,10 +220,13 @@ class SlowIODetection: slow_io_event_list = [] for disk, disk_detector in self._disk_detectors.items(): disk_detector.push_data_to_data_detectors(iodump_data_dict_with_disk_name) + disk_detector.push_data_to_data_detectors(io_disk_data_dict_with_disk_name) result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) if result[0]: # 产生告警时获取iodump的详细数据 - result[6]["iodump_data"] = disk_detector.get_data_detector_list_window() + win_data_wins = disk_detector.get_data_detector_list_window() + result[6]["iodump_data"] = win_data_wins['iodump_data'] + result[6]["disk_data"] = win_data_wins['disk_data'] slow_io_event_list.append(result) logging.debug("step2. End to detection slow io event.") @@ -240,6 +253,7 @@ class SlowIODetection: logging.warning(f"iops: " + str(alarm_content.get("details").get("iops"))) extra_slow_log(alarm_content) del alarm_content["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 + del alarm_content["details"]["disk_data"] Xalarm.major(alarm_content) # Step4:等待检测时间 diff --git a/src/sentryPlugins/ai_block_io/data_access.py b/src/sentryPlugins/ai_block_io/data_access.py index f1c2bc2..76578a5 100644 --- a/src/sentryPlugins/ai_block_io/data_access.py +++ b/src/sentryPlugins/ai_block_io/data_access.py @@ -16,12 +16,13 @@ from sentryCollector.collect_plugin import ( Result_Messages, get_io_data, get_iodump_data, + get_disk_data, is_iocollect_valid, get_disk_type ) -from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData +from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData, IOStageDiskData, IODiskData COLLECT_STAGES = [ "throtl", @@ -169,4 +170,47 @@ def get_iodump_data_from_collect_plug(period, disk_list): ret[disk] = disk_ret return ret logging.warning(f'get iodump data failed with message: {data_raw["message"]}') + return None + + +def _get_raw_disk_data(period, disk_list): + return get_disk_data( + period, + disk_list, + COLLECT_STAGES, + ["read", "write"], + ) + + +def _get_disk_stage_data(data): + io_stage_data = IOStageDiskData() + for data_type in ("read", "write"): + if data_type in data: + getattr(io_stage_data, data_type).disk_data = data[data_type] + return io_stage_data + + +def get_disk_data_from_collect_plug(period, disk_list): + data_raw = _get_raw_disk_data(period, disk_list) + if data_raw["ret"] == 0: + ret = {} + try: + data = json.loads(data_raw["message"]) + except json.decoder.JSONDecodeError as e: + logging.warning(f"get disk data failed, {e}") + return None + + for disk in data: + disk_data = data[disk] + disk_ret = IODiskData() + for k, v in disk_data.items(): + try: + getattr(disk_ret, k) + setattr(disk_ret, k, _get_disk_stage_data(v)) + except AttributeError: + logging.debug(f"no attr {k}") + continue + ret[disk] = disk_ret + return ret + logging.warning(f'get disk data failed with message: {data_raw["message"]}') return None \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/detector.py b/src/sentryPlugins/ai_block_io/detector.py index 6c0a03f..96939f5 100644 --- a/src/sentryPlugins/ai_block_io/detector.py +++ b/src/sentryPlugins/ai_block_io/detector.py @@ -14,7 +14,7 @@ from datetime import datetime from .io_data import MetricName from .threshold import Threshold from .sliding_window import SlidingWindow, DataWindow -from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_iodump_data_dict +from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_gen_data_dict class Detector: @@ -92,11 +92,10 @@ class DataDetector: def get_data_window_data(self): return self._data_window.get_data() - def push_data(self, iodump_data_dict_with_disk_name: dict): + def push_data(self, io_gen_data_dict_with_disk_name: dict): logging.debug(f'enter Detector: {self}') - metric_value = get_metric_value_from_iodump_data_dict(iodump_data_dict_with_disk_name, self._metric_name) + metric_value = get_metric_value_from_gen_data_dict(io_gen_data_dict_with_disk_name, self._metric_name) if metric_value is None: - logging.debug('not found metric value, so return None.') return False logging.debug(f'input metric value: {str(metric_value)}') self._data_window.push(metric_value) @@ -151,12 +150,19 @@ class DiskDetector: return latency_wins, iodump_wins, iops_wins def get_data_detector_list_window(self): + win_data_wins = {} iodump_data_wins = {"read": {}, "write": {}} + disk_data_wins = {"read": {}, "write": {}} for data_detector in self._data_detector_list: if data_detector.metric_name.metric_name == 'iodump_data': iodump_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ data_detector.get_data_window_data() - return iodump_data_wins + if data_detector.metric_name.metric_name == 'disk_data': + disk_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ + data_detector.get_data_window_data() + win_data_wins['iodump_data'] = iodump_data_wins + win_data_wins['disk_data'] = disk_data_wins + return win_data_wins def is_slow_io_event(self, io_data_dict_with_disk_name: dict): diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} @@ -203,6 +209,6 @@ class DiskDetector: return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details - def push_data_to_data_detectors(self, iodump_data_dict_with_disk_name: dict): + def push_data_to_data_detectors(self, io_gen_data_dict_with_disk_name: dict): for data_detector in self._data_detector_list: - data_detector.push_data(iodump_data_dict_with_disk_name) + data_detector.push_data(io_gen_data_dict_with_disk_name) diff --git a/src/sentryPlugins/ai_block_io/extra_logger.py b/src/sentryPlugins/ai_block_io/extra_logger.py index cfd1929..607f55a 100644 --- a/src/sentryPlugins/ai_block_io/extra_logger.py +++ b/src/sentryPlugins/ai_block_io/extra_logger.py @@ -148,6 +148,7 @@ def extra_latency_log(msg): ) except KeyError: return + extra_disk_log(msg, io_type) def extra_iodump_log(msg): @@ -183,4 +184,32 @@ def extra_iodump_log(msg): for bio_ptr in last_bio_record: task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" - extra_logger.warning(line) \ No newline at end of file + extra_logger.warning(line) + + +def extra_disk_log(msg, io_type): + disk_data = msg['details']['disk_data'].get(io_type, {}) + + try: + rq_driver_data = disk_data['rq_driver'] + except Exception as e: + extra_logger.error(f"Failed to parse disk data: {e}") + return + + if len(rq_driver_data) == 0: + return + + extra_logger.warning(f"disk latency:") + header = f"{'0-1ms':>12} {'1-10ms':>12} {'10-100ms':>15} {'100ms-1s':>15} {'1-3s':>12} {'>3s':>12}" + extra_logger.warning(header) + + total_data = [0] * 6 + for period_data in rq_driver_data: + for i in range(6): + total_data[i] += period_data[i] + num_periods = len(rq_driver_data) + avg_data = [total // num_periods for total in total_data] + extra_logger.warning( + f"{avg_data[0]:>12} {avg_data[1]:>12} {avg_data[2]:>15}" + f"{avg_data[3]:>15} {avg_data[4]:>12} {avg_data[5]:>12}" + ) \ No newline at end of file diff --git a/src/sentryPlugins/ai_block_io/io_data.py b/src/sentryPlugins/ai_block_io/io_data.py index 023e7b1..6f9347f 100644 --- a/src/sentryPlugins/ai_block_io/io_data.py +++ b/src/sentryPlugins/ai_block_io/io_data.py @@ -75,6 +75,33 @@ class IODumpData: time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) +@dataclass +class IODiskListData: + disk_data: List[int] = field(default_factory=list) + + +@dataclass +class IOStageDiskData: + read: IODiskListData = field(default_factory=lambda: IODiskListData()) + write: IODiskListData = field(default_factory=lambda: IODiskListData()) + + +@dataclass +class IODiskData: + throtl: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + wbt: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + gettag: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + iocost: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + plug: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + bfq: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + hctx: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + requeue: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + rq_driver: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + bio: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + deadline: IOStageDiskData = field(default_factory=lambda: IOStageDiskData()) + time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + @dataclass(frozen=True) class MetricName: disk_name: str diff --git a/src/sentryPlugins/ai_block_io/utils.py b/src/sentryPlugins/ai_block_io/utils.py index 919cf9b..ae63f5f 100644 --- a/src/sentryPlugins/ai_block_io/utils.py +++ b/src/sentryPlugins/ai_block_io/utils.py @@ -15,7 +15,7 @@ from dataclasses import asdict from .threshold import ThresholdType from .sliding_window import SlidingWindowType -from .io_data import MetricName, IOData, IODumpData +from .io_data import MetricName, IOData def get_threshold_type_enum(algorithm_type: str): @@ -49,11 +49,11 @@ def get_metric_value_from_io_data_dict_by_metric_name( return None -def get_metric_value_from_iodump_data_dict(io_dump_data_dict: dict, metric_name: MetricName): +def get_metric_value_from_gen_data_dict(io_gen_data_dict: dict, metric_name: MetricName): try: - io_dump_data: IODumpData = io_dump_data_dict[metric_name.disk_name] - io_dump_stage_data = asdict(io_dump_data)[metric_name.stage_name] - base_data = io_dump_stage_data[metric_name.io_access_type_name] + io_gen_data = io_gen_data_dict[metric_name.disk_name] + io_gen_stage_data = asdict(io_gen_data)[metric_name.stage_name] + base_data = io_gen_stage_data[metric_name.io_access_type_name] metric_value = base_data[metric_name.metric_name] return metric_value except KeyError: diff --git a/src/sentryPlugins/avg_block_io/avg_block_io.py b/src/sentryPlugins/avg_block_io/avg_block_io.py index ef19b7b..42dc7d0 100644 --- a/src/sentryPlugins/avg_block_io/avg_block_io.py +++ b/src/sentryPlugins/avg_block_io/avg_block_io.py @@ -14,9 +14,10 @@ import configparser import time from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage -from .stage_window import IoWindow, IoDumpWindow,IopsWindow,IodumpMsgWindow -from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation -from .utils import update_avg_and_check_abnormal, update_avg_iodump_data +from .stage_window import IoWindow, IoDumpWindow, IopsWindow, IoArrayDataWindow +from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, avg_get_disk_data, \ + report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation +from .utils import update_avg_and_check_abnormal, update_avg_array_data from .extra_logger import init_extra_logger CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" @@ -69,8 +70,11 @@ def init_io_win(io_dic, config, common_param): io_data[disk_name][stage_name][rw]["iops"] = IopsWindow(window_size=io_dic["win_size"]) logging.debug("Successfully create {}-{}-{}-iops window".format(disk_name, stage_name, rw)) - io_data[disk_name][stage_name][rw]["iodump_data"] = IodumpMsgWindow(window_size=io_dic["win_size"]) + io_data[disk_name][stage_name][rw]["iodump_data"] = IoArrayDataWindow(window_size=io_dic["win_size"]) logging.debug("Successfully create {}-{}-{}-iodump_data window".format(disk_name, stage_name, rw)) + + io_data[disk_name][stage_name][rw]["disk_data"] = IoArrayDataWindow(window_size=io_dic["win_size"]) + logging.debug("Successfully create {}-{}-{}-disk_data window".format(disk_name, stage_name, rw)) return io_data, io_avg_value @@ -138,7 +142,10 @@ def main_loop(io_dic, io_data, io_avg_value): continue # 获取iodump的详细信息 - is_success, iodump_data = avg_get_iodump_data(io_dic) + is_success_iodump, iodump_data = avg_get_iodump_data(io_dic) + + # 获取磁盘的时延数据 + is_success_disk, disk_data = avg_get_disk_data(io_dic) # 处理周期数据 reach_size = False @@ -148,7 +155,8 @@ def main_loop(io_dic, io_data, io_avg_value): if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: io_key = (disk_name, stage_name, rw) reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) - update_avg_iodump_data(iodump_data, is_success, io_key, io_data) + update_avg_array_data(iodump_data, is_success_iodump, io_key, io_data, "iodump_data") + update_avg_array_data(disk_data, is_success_disk, io_key, io_data, "disk_data") # win_size不满时不进行告警判断 if not reach_size: diff --git a/src/sentryPlugins/avg_block_io/config.py b/src/sentryPlugins/avg_block_io/config.py index 79bd21a..fe24468 100644 --- a/src/sentryPlugins/avg_block_io/config.py +++ b/src/sentryPlugins/avg_block_io/config.py @@ -133,7 +133,7 @@ def read_config_common(config): except configparser.NoOptionError: iotype_list = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_IOTYPE] - logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotupe_list} as default") + logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotype_list} as default") try: period_time = int(config.get(CONF_COMMON, CONF_COMMON_PER_TIME)) diff --git a/src/sentryPlugins/avg_block_io/extra_logger.py b/src/sentryPlugins/avg_block_io/extra_logger.py index ac86306..cd8a8d8 100644 --- a/src/sentryPlugins/avg_block_io/extra_logger.py +++ b/src/sentryPlugins/avg_block_io/extra_logger.py @@ -164,6 +164,8 @@ def extra_latency_log(msg): except KeyError: return + extra_disk_log(msg) + def extra_iodump_log(msg): extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{msg['io_type']}") @@ -196,4 +198,32 @@ def extra_iodump_log(msg): for bio_ptr in last_bio_record: task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" - extra_logger.warning(line) \ No newline at end of file + extra_logger.warning(line) + + +def extra_disk_log(msg): + disk_str = msg['details']['disk_data'] + try: + disk_data = ast.literal_eval(disk_str) + rq_driver_data = disk_data['rq_driver'] + except Exception as e: + extra_logger.error(f"Failed to parse disk data: {e}") + return + + if not rq_driver_data[0]: + return + + extra_logger.warning(f"disk latency:") + header = f"{'0-1ms':>12} {'1-10ms':>12} {'10-100ms':>15} {'100ms-1s':>15} {'1-3s':>12} {'>3s':>12}" + extra_logger.warning(header) + + total_data = [0] * 6 + for period_data in rq_driver_data: + for i in range(6): + total_data[i] += period_data[i] + num_periods = len(rq_driver_data) + avg_data = [total // num_periods for total in total_data] + extra_logger.warning( + f"{avg_data[0]:>12} {avg_data[1]:>12} {avg_data[2]:>15}" + f"{avg_data[3]:>15} {avg_data[4]:>12} {avg_data[5]:>12}" + ) \ No newline at end of file diff --git a/src/sentryPlugins/avg_block_io/module_conn.py b/src/sentryPlugins/avg_block_io/module_conn.py index 7e9b065..60f24a6 100644 --- a/src/sentryPlugins/avg_block_io/module_conn.py +++ b/src/sentryPlugins/avg_block_io/module_conn.py @@ -12,7 +12,8 @@ import json import logging import sys -from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, Result_Messages, get_disk_type, Disk_Type +from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, get_disk_data, \ + Result_Messages, get_disk_type, Disk_Type from syssentry.result import ResultLevel, report_result from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR from .utils import is_abnormal, get_win_data, log_slow_win @@ -42,6 +43,14 @@ def avg_get_iodump_data(io_dic): return check_result_validation(res, 'get io dump data') +def avg_get_disk_data(io_dic): + """avg_get_disk_data from sentryCollector""" + logging.debug(f"send to sentryCollector avg_get_disk_data: period={io_dic['period_time']}, " + f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") + res = get_disk_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) + return check_result_validation(res, 'get disk data') + + def avg_is_iocollect_valid(io_dic, config_disk, config_stage): """is_iocollect_valid from sentryCollector""" logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " @@ -98,6 +107,7 @@ def process_report_data(disk_name, rw, io_data): msg["alarm_type"] = abnormal_list log_slow_win(msg, "IO press") del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + del msg["details"]["disk_data"] xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return @@ -109,6 +119,7 @@ def process_report_data(disk_name, rw, io_data): msg["alarm_type"] = abnormal_list log_slow_win(msg, "driver slow") del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + del msg["details"]["disk_data"] xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return @@ -123,11 +134,13 @@ def process_report_data(disk_name, rw, io_data): msg["alarm_type"] = abnormal_list log_slow_win(msg, "kernel slow") del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + del msg["details"]["disk_data"] xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) return log_slow_win(msg, "unknown") del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + del msg["details"]["disk_data"] xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) diff --git a/src/sentryPlugins/avg_block_io/stage_window.py b/src/sentryPlugins/avg_block_io/stage_window.py index 29fa6e1..88204a2 100644 --- a/src/sentryPlugins/avg_block_io/stage_window.py +++ b/src/sentryPlugins/avg_block_io/stage_window.py @@ -60,7 +60,7 @@ class IopsWindow(AbnormalWindowBase): return False -class IodumpMsgWindow: +class IoArrayDataWindow: def __init__(self, window_size=10): self.window_size = window_size self.window_data = [[] for _ in range(window_size)] diff --git a/src/sentryPlugins/avg_block_io/utils.py b/src/sentryPlugins/avg_block_io/utils.py index d9af7fe..9726641 100644 --- a/src/sentryPlugins/avg_block_io/utils.py +++ b/src/sentryPlugins/avg_block_io/utils.py @@ -42,6 +42,7 @@ def get_win_data(disk_name, rw, io_data): iodump = '' iops = '' iodump_data = '' + disk_data = '' for stage_name in io_data[disk_name]: if 'latency' in io_data[disk_name][stage_name][rw]: latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() @@ -55,9 +56,15 @@ def get_win_data(disk_name, rw, io_data): if 'iodump_data' in io_data[disk_name][stage_name][rw]: iodump_data_list = io_data[disk_name][stage_name][rw]['iodump_data'].window_data_to_string() iodump_data += f'"{stage_name}": {iodump_data_list}, ' + if 'disk_data' in io_data[disk_name][stage_name][rw]: + disk_data_list = io_data[disk_name][stage_name][rw]['disk_data'].window_data_to_string() + disk_data += f'"{stage_name}": {disk_data_list}, ' if iodump_data: iodump_data = '{' + iodump_data[:-2] + '}' - return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], "iodump_data": iodump_data} + if disk_name: + disk_data = '{' + disk_data[:-2] + '}' + return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], \ + "iodump_data": iodump_data, "disk_data": disk_data} def is_abnormal(io_key, io_data): @@ -154,13 +161,13 @@ def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data) return True -def update_avg_iodump_data(iodump_data, is_success, io_key, io_data): - """update iodump data to io_data""" +def update_avg_array_data(array_data, is_success, io_key, io_data, data_type): + """update array data to io_data""" all_wins = get_nested_value(io_data, io_key) - if all_wins and "iodump_data" in all_wins: + if all_wins and data_type in all_wins: if not is_success: - io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data([]) + io_data[io_key[0]][io_key[1]][io_key[2]][data_type].append_new_data([]) else: - period_value = get_nested_value(iodump_data, io_key) - io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data(period_value) + period_value = get_nested_value(array_data, io_key) + io_data[io_key[0]][io_key[1]][io_key[2]][data_type].append_new_data(period_value) diff --git a/src/services/sentryCollector/collect_disk.py b/src/services/sentryCollector/collect_disk.py new file mode 100644 index 0000000..03eda85 --- /dev/null +++ b/src/services/sentryCollector/collect_disk.py @@ -0,0 +1,215 @@ +# coding: utf-8 +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +Get disk latency distribution data. +""" +import struct +import logging +from typing import List, Optional, Dict, Any +from .collect_plugin import get_disk_type +from syssentry.utils import execute_command + + +class CollectDisk: + """ + 硬盘时延分布数据采集,目前仅支持华为V6和V7代nvme盘,且接口协议版本应为1,带内查询接口如下: + nvme get-log -i 0xC2 -l 784 /dev/[nvme_disk] + 正常调用时,响应格式为: + Device:nvme0n1 log-id:xxx namespace-id:xxx + 0 1 2 3 4 5 6 7 8 9 a b c d e f + 0000 01 00 00 00 00 00 00 00 72 00 00 00 18 00 00 00 + 0010 53 3e 00 00 b7 c9 00 00 fc 0c 01 00 42 f9 02 00 + 0020 b6 fe 03 00 95 04 04 00 14 05 04 00 38 f2 03 00 + ... + 0300 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 + 响应部分各字段含义如下: + Byte 类型 描述 + 03:00 version major version + 07:04 version minor version + 135:08 读时延分布 时延为0-1ms的读命令个数,每32us一个档位,共32个档位,共128bytes,每个档位统计占用4byte + 259:136 读时延分布 时延为1-32ms的读命令个数,每1ms一个档位,共31个档位,共124bytes,每个档位统计占用4byte + 379:260 读时延分布 时延为32ms-1s的读命令个数,每32ms一个档位,共30个档位,共120bytes,每个档位统计占用4byte + 383:380 读时延分布 时延为1-2s的读命令个数,只有一个档位,统计个数占用4byte + 387:384 读时延分布 时延为2-3s的读命令个数,只有一个档位,统计个数占用4byte + 391:388 读时延分布 时延为3-4s的读命令个数,只有一个档位,统计个数占用4byte + 395:392 读时延分布 时延为大于4s的读命令个数,只有一个档位,统计个数占用4byte + 523:296 写时延分布 时延为0-1ms的写命令个数,每32us一个档位,共32个档位,共128bytes,每个档位统计占用4byte + 647:523 写时延分布 时延为1-32ms的写命令个数,每1ms一个档位,共31个档位,共124bytes,每个档位统计占用4byte + 767:648 写时延分布 时延为32ms-1s的写命令个数,每32ms一个档位,共30个档位,共120bytes,每个档位统计占用4byte + 771:768 写时延分布 时延为1-2s的写命令个数,只有一个档位,统计个数占用4byte + 775:772 写时延分布 时延为2-3s的写命令个数,只有一个档位,统计个数占用4byte + 779:776 写时延分布 时延为3-4s的写命令个数,只有一个档位,统计个数占用4byte + 783:780 写时延分布 时延为大于4s的写命令个数,只有一个档位,统计个数占用4byte + 最终返回的数据会合并一下,一共12个数据,分别为0-1ms,1-10ms,10-100ms,100-1s,1-3s,大于3s这6个档位的读写时延 + """ + + def __init__(self, disk_name: str): + self.disk_name = disk_name + self.is_support = False + self._check_support() + + def get_support_flag(self) -> bool: + return self.is_support + + def collect_data(self) -> List[int]: + if not self.is_support: + logging.error(f"Disk {self.disk_name} is not supported for latency collection.") + return [] + + try: + cmd = ["nvme", "get-log", "-i", "0xC2", "-l", "784", f"/dev/{self.disk_name}"] + output = execute_command(cmd) + if not output: + logging.error(f"Failed to get NVMe log for disk {self.disk_name}.") + return [] + + result = self._parse_nvme_output(output) + return result + except Exception as e: + logging.error(f"Error collecting latency data for disk {self.disk_name}: {e}") + return [] + + def _check_support(self): + try: + disk_type_result = get_disk_type(self.disk_name) + if disk_type_result["ret"] != 0 or disk_type_result["message"] != '0': + logging.warning(f"Disk {self.disk_name} type is not supported.") + return + + if not self._check_disk_model(): + logging.warning(f"Disk {self.disk_name} model is not supported.") + return + + if not self._check_nvme_version(): + logging.warning(f"Disk {self.disk_name} NVMe version is not supported.") + return + + self.is_support = True + logging.info(f"Disk {self.disk_name} is supported for latency collection.") + except Exception as e: + logging.error(f"Error checking disk {self.disk_name} support: {e}") + + def _check_disk_model(self) -> bool: + cmd = ["lsblk", "-o", "name,model"] + try: + output = execute_command(cmd) + if not output: + logging.error(f"Failed to get disk model.") + return False + + for line in output.splitlines(): + if self.disk_name in line: + parts = line.split() + if len(parts) >= 2: + model = parts[-1] + if model.startswith("HWE6") or model.startswith("HWE7"): + return True + except Exception as e: + logging.error(f"Error checking disk model for {self.disk_name}: {e}") + return False + + def _check_nvme_version(self) -> bool: + try: + cmd = ["nvme", "get-log", "-i", "0xC2", "-l", "784", f"/dev/{self.disk_name}"] + output = execute_command(cmd) + if not output: + logging.error(f"Failed to get NVMe log for disk {self.disk_name}.") + return False + + lines = output.splitlines() + line = lines[2] + parts = line.split() + hex_data = [] + if len(parts) < 17: + return False + for hex_byte in parts[1:9]: + hex_data.append(hex_byte) + + data = bytes.fromhex(''.join(hex_data)) + if len(data) < 8: + return False + + major_version = struct.unpack(' List[int]: + lines = output.splitlines() + hex_data = [] + + for line in lines[2:]: + parts = line.split() + for hex_byte in parts[1:17]: + hex_data.append(hex_byte) + + data = bytes.fromhex(''.join(hex_data)) + if len(data) < 784: + logging.error(f"NVMe log data for disk {self.disk_name} is incomplete.") + return [] + + result = [0] * 12 # 6 read + 6 write latency buckets + for i in range(32): + offset = 8 + i * 4 + count = struct.unpack(' None: global IO_GLOBAL_DATA + global IO_DUMP_DATA while True: if self.stop_event.is_set(): logging.debug("collect io thread exit") @@ -402,6 +408,7 @@ class CollectIo(): logging.info(f"ebpf io_dump info : {disk_name}, {stage}, {io_type}, {curr_io_dump}") IO_GLOBAL_DATA[disk_name][stage][io_type].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) IO_DUMP_DATA[disk_name][stage][io_type].insert(0, []) + self.append_disk_data(disk_name) elapsed_time = time.time() - start_time sleep_time = self.period_time - elapsed_time @@ -497,6 +504,51 @@ class CollectIo(): EBPF_PROCESS.wait() logging.info("ebpf collector thread exit") + def init_disk_collect( + self, + disk_name: str + ) -> None: + collector = CollectDisk(disk_name) + support_flag = collector.get_support_flag() + if support_flag: + self.disk_collectors[disk_name] = collector + self.disk_data_window_value[disk_name] = [] + DISK_DATA[disk_name] = {} + DISK_DATA[disk_name]['rq_driver'] = {} + DISK_DATA[disk_name]['rq_driver']['read'] = [] + DISK_DATA[disk_name]['rq_driver']['write'] = [] + + def append_disk_data( + self, + disk_name: str, + ) -> None: + if disk_name in self.disk_collectors: + collect_disk = self.disk_collectors[disk_name] + curr_value = collect_disk.collect_data() + if len(curr_value) != 0: + self.disk_data_window_value[disk_name].append(curr_value) + if len(self.disk_data_window_value[disk_name]) < 2: + return + if len(self.disk_data_window_value[disk_name]) > 2: + self.disk_data_window_value[disk_name].pop(0) + + read_data = [] + write_data = [] + last_value = self.disk_data_window_value[disk_name][0] + + for i in range(6): + delta = curr_value[i] - last_value[i] + read_data.append(max(0, delta)) + for i in range(6, 12): + delta = curr_value[i] - last_value[i] + write_data.append(max(0, delta)) + if len(DISK_DATA[disk_name]['rq_driver']['read']) >= self.max_save: + DISK_DATA[disk_name]['rq_driver']['read'].pop() + if len(DISK_DATA[disk_name]['rq_driver']['write']) >= self.max_save: + DISK_DATA[disk_name]['rq_driver']['write'].pop() + DISK_DATA[disk_name]['rq_driver']['read'].insert(0, read_data) + DISK_DATA[disk_name]['rq_driver']['write'].insert(0, write_data) + def main_loop(self): global IO_GLOBAL_DATA global IO_DUMP_DATA @@ -512,6 +564,7 @@ class CollectIo(): IO_GLOBAL_DATA[disk_name][stage][category] = [] IO_DUMP_DATA[disk_name][stage][category] = [] self.update_io_threshold(disk_name, stage_list) + self.init_disk_collect(disk_name) while True: start_time = time.time() @@ -524,6 +577,7 @@ class CollectIo(): if self.get_blk_io_hierarchy(disk_name, stage_list) < 0: continue self.append_period_lat(disk_name, stage_list) + self.append_disk_data(disk_name) elapsed_time = time.time() - start_time sleep_time = self.period_time - elapsed_time diff --git a/src/services/sentryCollector/collect_server.py b/src/services/sentryCollector/collect_server.py index b045d4c..9a0c886 100644 --- a/src/services/sentryCollector/collect_server.py +++ b/src/services/sentryCollector/collect_server.py @@ -24,7 +24,7 @@ import select import threading import time -from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA +from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA, DISK_DATA from .collect_config import CollectConfig SENTRY_RUN_DIR = "/var/run/sysSentry" @@ -49,7 +49,8 @@ class ServerProtocol(): IS_IOCOLLECT_VALID = 0 GET_IO_DATA = 1 GET_IODUMP_DATA = 2 - PRO_END = 3 + GET_DISK_DATA = 3 + PRO_END = 4 class CollectServer(): @@ -133,12 +134,14 @@ class CollectServer(): return json.dumps(result_rev) def get_io_data(self, data_struct): - self.io_global_data = IO_GLOBAL_DATA - return self.get_io_common(data_struct, self.io_global_data) + return self.get_io_common(data_struct, IO_GLOBAL_DATA) def get_iodump_data(self, data_struct): return self.get_io_common(data_struct, IO_DUMP_DATA) + def get_disk_data(self, data_struct): + return self.get_io_common(data_struct, DISK_DATA) + def msg_data_process(self, msg_data, protocal_id): """message data process""" logging.debug("msg_data %s", msg_data) @@ -155,6 +158,8 @@ class CollectServer(): res_msg = self.get_io_data(data_struct) elif protocal_id == ServerProtocol.GET_IODUMP_DATA: res_msg = self.get_iodump_data(data_struct) + elif protocal_id == ServerProtocol.GET_DISK_DATA: + res_msg = self.get_disk_data(data_struct) return res_msg -- Gitee