From 12f150d7302e9c89a65b838336b09a55cae33560 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com> Date: Fri, 11 Oct 2024 13:27:50 +0000 Subject: [PATCH] =?UTF-8?q?ai=5Fblock=5Fio=20support=20stage=E3=80=81iotyp?= =?UTF-8?q?e=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 贺有志 <1037617413@qq.com> --- .../config/plugins/ai_block_io.ini | 6 +- .../sentryPlugins/ai_block_io/ai_block_io.py | 100 +++++++----------- .../ai_block_io/config_parser.py | 94 ++++++++++++++-- .../sentryPlugins/ai_block_io/detector.py | 25 +++++ 4 files changed, 155 insertions(+), 70 deletions(-) diff --git a/sysSentry-1.0.2/config/plugins/ai_block_io.ini b/sysSentry-1.0.2/config/plugins/ai_block_io.ini index d706b31..ce636d5 100644 --- a/sysSentry-1.0.2/config/plugins/ai_block_io.ini +++ b/sysSentry-1.0.2/config/plugins/ai_block_io.ini @@ -1,8 +1,12 @@ +[log] +level=info + [common] absolute_threshold=40 slow_io_detect_frequency=1 -log_level=info disk=default +stage=bio +iotype=read,write [algorithm] train_data_duration=24 diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py index c0750f9..e1052ec 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py @@ -13,7 +13,7 @@ import time import signal import logging -from .detector import Detector +from .detector import Detector, DiskDetector from .threshold import ThresholdFactory, AbsoluteThreshold from .sliding_window import SlidingWindowFactory from .utils import get_data_queue_size_and_update_size @@ -34,8 +34,8 @@ def sig_handler(signum, frame): class SlowIODetection: _config_parser = None _disk_list = None - _detector_name_list = [] - _detectors = {} + _detector_name_list = {} + _disk_detectors = {} def __init__(self, config_parser: ConfigParser): self._config_parser = config_parser @@ -43,61 +43,40 @@ class SlowIODetection: self.__init_detector() def __init_detector_name_list(self): - self._disk_list = check_collect_valid( - self._config_parser.slow_io_detect_frequency - ) + self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency) if self._disk_list is None: - Report.report_pass( - "get available disk error, please check if the collector plug is enable. exiting..." - ) + Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") exit(1) logging.info(f"ai_block_io plug has found disks: {self._disk_list}") - disks_to_detection = self._config_parser.disks_to_detection + disks: list = self._config_parser.disks_to_detection + stages: list = self._config_parser.stage + iotypes: list = self._config_parser.iotype # 情况1:None,则启用所有磁盘检测 # 情况2:is not None and len = 0,则不启动任何磁盘检测 # 情况3:len != 0,则取交集 - if disks_to_detection is None: - logging.warning( - "you not specify any disk or use default, so ai_block_io will enable all available disk." - ) + if disks is None: + logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") for disk in self._disk_list: - self._detector_name_list.append( - MetricName(disk, "bio", "read", "latency") - ) - self._detector_name_list.append( - MetricName(disk, "bio", "write", "latency") - ) - if len(self._detector_name_list) >= 30: - logging.warning( - "the number of disks to detection is large than 30, so it will be ignored." - ) - break + for stage in stages: + for iotype in iotypes: + if disk not in self._detector_name_list: + self._detector_name_list[disk] = [] + self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) else: - for disk_to_detection in disks_to_detection: - if disk_to_detection in self._disk_list: - self._detector_name_list.append( - MetricName(disk_to_detection, "bio", "read", "latency") - ) - self._detector_name_list.append( - MetricName(disk_to_detection, "bio", "write", "latency") - ) + for disk in disks: + if disk in self._disk_list: + for stage in stages: + for iotype in iotypes: + if disk not in self._detector_name_list: + self._detector_name_list[disk] = [] + self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) else: - logging.warning( - "disk: [%s] not in available disk list, so it will be ignored.", - disk_to_detection, - ) + logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk) if len(self._detector_name_list) == 0: - logging.critical( - "the disks to detection is empty, ai_block_io will exit." - ) - Report.report_pass( - "the disks to detection is empty, ai_block_io will exit." - ) + logging.critical("the disks to detection is empty, ai_block_io will exit.") + Report.report_pass("the disks to detection is empty, ai_block_io will exit.") exit(1) - logging.info( - f"start to detection follow disk and it's metric: {self._detector_name_list}" - ) def __init_detector(self): train_data_duration, train_update_duration = ( @@ -109,11 +88,9 @@ class SlowIODetection: train_data_duration, train_update_duration, slow_io_detection_frequency ) sliding_window_type = self._config_parser.sliding_window_type - window_size, window_threshold = ( - self._config_parser.get_window_size_and_window_minimum_threshold() - ) + window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold()) - for detector_name in self._detector_name_list: + for disk, metric_name_list in self._detector_name_list.items(): threshold = ThresholdFactory().get_threshold( threshold_type, boxplot_parameter=self._config_parser.boxplot_parameter, @@ -126,12 +103,12 @@ class SlowIODetection: queue_length=window_size, threshold=window_threshold, ) - detector = Detector(detector_name, threshold, sliding_window) - # 绝对阈值的阈值初始化 - if isinstance(threshold, AbsoluteThreshold): - threshold.set_threshold(self._config_parser.absolute_threshold) - self._detectors[detector_name] = detector - logging.info(f"add detector: {detector}") + disk_detector = DiskDetector(disk) + for metric_name in metric_name_list: + detector = Detector(metric_name, threshold, sliding_window) + disk_detector.add_detector(detector) + logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]') + self._disk_detectors[disk] = disk_detector def launch(self): while True: @@ -151,17 +128,16 @@ class SlowIODetection: # Step2:慢IO检测 logging.debug("step2. Start to detection slow io event.") slow_io_event_list = [] - for metric_name, detector in self._detectors.items(): - result = detector.is_slow_io_event(io_data_dict_with_disk_name) + for disk, disk_detector in self._disk_detectors.items(): + result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) if result[0]: - slow_io_event_list.append((detector.get_metric_name(), result)) + slow_io_event_list.append(result) logging.debug("step2. End to detection slow io event.") # Step3:慢IO事件上报 logging.debug("step3. Report slow io event to sysSentry.") for slow_io_event in slow_io_event_list: - metric_name: MetricName = slow_io_event[0] - result = slow_io_event[1] + metric_name: MetricName = slow_io_event[1] alarm_content = { "driver_name": f"{metric_name.get_disk_name()}", "reason": "disk_slow", @@ -169,7 +145,7 @@ class SlowIODetection: "io_type": f"{metric_name.get_io_access_type_name()}", "alarm_source": "ai_block_io", "alarm_type": "latency", - "details": f"current window is: {result[1]}, threshold is: {result[2]}.", + "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.", } Xalarm.major(alarm_content) logging.warning(alarm_content) diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py index 1fa2fa5..a357766 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py @@ -12,14 +12,17 @@ import os import configparser import logging -from .alarm_report import Report +from .alarm_report import Report from .threshold import ThresholdType from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" +ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio'] +ALL_IOTPYE_LIST = ['read', 'write'] + def init_log_format(log_level: str): logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT) @@ -34,6 +37,9 @@ class ConfigParser: DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1 DEFAULT_LOG_LEVEL = "info" + DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio' + DEFAULT_IOTYPE = 'read,write' + DEFAULT_ALGORITHM_TYPE = "boxplot" DEFAULT_TRAIN_DATA_DURATION = 24 DEFAULT_TRAIN_UPDATE_DURATION = 2 @@ -51,6 +57,8 @@ class ConfigParser: ) self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL self.__disks_to_detection = None + self.__stage = ConfigParser.DEFAULT_STAGE + self.__iotype = ConfigParser.DEFAULT_IOTYPE self.__algorithm_type = get_threshold_type_enum( ConfigParser.DEFAULT_ALGORITHM_TYPE @@ -243,6 +251,43 @@ class ConfigParser: le=10, ) + def __read__stage(self, items_algorithm: dict): + stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE) + stage_list = stage_str.split(',') + if len(stage_list) == 1 and stage_list[0] == '': + logging.critical('stage value not allow is empty, exiting...') + exit(1) + if len(stage_list) == 1 and stage_list[0] == 'default': + logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}') + self.__stage = ALL_STAGE_LIST + return + for stage in stage_list: + if stage not in ALL_STAGE_LIST: + logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...') + exit(1) + dup_stage_list = set(stage_list) + if 'bio' not in dup_stage_list: + logging.critical('stage must contains bio stage, exiting...') + exit(1) + self.__stage = dup_stage_list + + def __read__iotype(self, items_algorithm: dict): + iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE) + iotype_list = iotype_str.split(',') + if len(iotype_list) == 1 and iotype_list[0] == '': + logging.critical('iotype value not allow is empty, exiting...') + exit(1) + if len(iotype_list) == 1 and iotype_list[0] == 'default': + logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}') + self.__iotype = ALL_IOTPYE_LIST + return + for iotype in iotype_list: + if iotype not in ALL_IOTPYE_LIST: + logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...') + exit(1) + dup_iotype_list = set(iotype_list) + self.__iotype = dup_iotype_list + def __read__window_size(self, items_sliding_window: dict): self.__window_size = self._get_config_value( items_sliding_window, @@ -291,17 +336,25 @@ class ConfigParser: ) exit(1) + if con.has_section('log'): + items_log = dict(con.items('log')) + # 情况一:没有log,则使用默认值 + # 情况二:有log,值为空或异常,使用默认值 + # 情况三:有log,值正常,则使用该值 + self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL) + init_log_format(self.__log_level) + else: + init_log_format(self.__log_level) + logging.warning(f"log section parameter not found, it will be set to default value.") + if con.has_section("common"): items_common = dict(con.items("common")) - self.__log_level = items_common.get( - "log_level", ConfigParser.DEFAULT_LOG_LEVEL - ) - init_log_format(self.__log_level) self.__read_absolute_threshold(items_common) self.__read__slow_io_detect_frequency(items_common) self.__read__disks_to_detect(items_common) + self.__read__stage(items_common) + self.__read__iotype(items_common) else: - init_log_format(self.__log_level) logging.warning( "common section parameter not found, it will be set to default value." ) @@ -333,8 +386,27 @@ class ConfigParser: self.__print_all_config_value() + def __repr__(self): + config_str = { + 'log.level': self.__log_level, + 'common.absolute_threshold': self.__absolute_threshold, + 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency, + 'common.disk': self.__disks_to_detection, + 'common.stage': self.__stage, + 'common.iotype': self.__iotype, + 'algorithm.train_data_duration': self.__train_data_duration, + 'algorithm.train_update_duration': self.__train_update_duration, + 'algorithm.algorithm_type': self.__algorithm_type, + 'algorithm.boxplot_parameter': self.__boxplot_parameter, + 'algorithm.n_sigma_parameter': self.__n_sigma_parameter, + 'sliding_window.sliding_window_type': self.__sliding_window_type, + 'sliding_window.window_size': self.__window_size, + 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold + } + return str(config_str) + def __print_all_config_value(self): - pass + logging.info(f"all config is follow:\n {self}") def get_train_data_duration_and_train_update_duration(self): return self.__train_data_duration, self.__train_update_duration @@ -382,6 +454,14 @@ class ConfigParser: def disks_to_detection(self): return self.__disks_to_detection + @property + def stage(self): + return self.__stage + + @property + def iotype(self): + return self.__iotype + @property def boxplot_parameter(self): return self.__boxplot_parameter diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py index 0ed282b..e710ddd 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py @@ -53,3 +53,28 @@ class Detector: f' io_type_name: {self._metric_name.get_io_access_type_name()},' f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},' f' sliding_window_type: {self._slidingWindow}') + + +class DiskDetector: + + def __init__(self, disk_name: str): + self._disk_name = disk_name + self._detector_list = [] + + def add_detector(self, detector: Detector): + self._detector_list.append(detector) + + def is_slow_io_event(self, io_data_dict_with_disk_name: dict): + # 只有bio阶段发生异常,就认为发生了慢IO事件 + # todo:根因诊断 + for detector in self._detector_list: + result = detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0] and detector.get_metric_name().get_stage_name() == 'bio': + return result[0], detector.get_metric_name(), result[1], result[2] + return False, None, None, None + + def __repr__(self): + msg = f'disk: {self._disk_name}, ' + for detector in self._detector_list: + msg += f'\n detector: [{detector}]' + return msg -- Gitee