From 11bed44a5c7bdb60d95b4db5ee12b89c7150cb8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com> Date: Fri, 11 Oct 2024 14:02:07 +0000 Subject: [PATCH] ai_block_io support stage and iotype MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 贺有志 <1037617413@qq.com> --- ai_block_io-support-stage-and-iotype.patch | 906 +++++++++++++++++++++ sysSentry.spec | 9 +- 2 files changed, 914 insertions(+), 1 deletion(-) create mode 100644 ai_block_io-support-stage-and-iotype.patch diff --git a/ai_block_io-support-stage-and-iotype.patch b/ai_block_io-support-stage-and-iotype.patch new file mode 100644 index 0000000..1fd7505 --- /dev/null +++ b/ai_block_io-support-stage-and-iotype.patch @@ -0,0 +1,906 @@ +From 13dc3712b4530a312aa43610f7696a4a62f30e96 Mon Sep 17 00:00:00 2001 +From: =?UTF-8?q?=E8=B4=BA=E6=9C=89=E5=BF=97?= <1037617413@qq.com> +Date: Fri, 11 Oct 2024 21:50:32 +0800 +Subject: [PATCH] ai_block_io support stage and iotype + +--- + config/plugins/ai_block_io.ini | 7 +- + .../sentryPlugins/ai_block_io/ai_block_io.py | 126 +++-- + .../ai_block_io/config_parser.py | 471 +++++++++++++----- + .../sentryPlugins/ai_block_io/data_access.py | 11 +- + .../sentryPlugins/ai_block_io/detector.py | 25 + + src/python/sentryPlugins/ai_block_io/utils.py | 3 +- + 6 files changed, 453 insertions(+), 190 deletions(-) + +diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini +index 01ce266..a814d52 100644 +--- a/config/plugins/ai_block_io.ini ++++ b/config/plugins/ai_block_io.ini +@@ -1,7 +1,12 @@ ++[log] ++level=info ++ + [common] + absolute_threshold=40 + slow_io_detect_frequency=1 +-log_level=info ++disk=default ++stage=bio ++iotype=read,write + + [algorithm] + train_data_duration=24 +diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py +index 77104a9..e1052ec 100644 +--- a/src/python/sentryPlugins/ai_block_io/ai_block_io.py ++++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py +@@ -13,7 +13,7 @@ import time + import signal + import logging + +-from .detector import Detector ++from .detector import Detector, DiskDetector + from .threshold import ThresholdFactory, AbsoluteThreshold + from .sliding_window import SlidingWindowFactory + from .utils import get_data_queue_size_and_update_size +@@ -34,8 +34,8 @@ def sig_handler(signum, frame): + class SlowIODetection: + _config_parser = None + _disk_list = None +- _detector_name_list = [] +- _detectors = {} ++ _detector_name_list = {} ++ _disk_detectors = {} + + def __init__(self, config_parser: ConfigParser): + self._config_parser = config_parser +@@ -43,85 +43,101 @@ class SlowIODetection: + self.__init_detector() + + def __init_detector_name_list(self): +- self._disk_list = check_collect_valid(self._config_parser.get_slow_io_detect_frequency()) ++ self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency) + if self._disk_list is None: + Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") + exit(1) + + logging.info(f"ai_block_io plug has found disks: {self._disk_list}") +- disks_to_detection: list = self._config_parser.get_disks_to_detection() ++ disks: list = self._config_parser.disks_to_detection ++ stages: list = self._config_parser.stage ++ iotypes: list = self._config_parser.iotype + # 情况1:None,则启用所有磁盘检测 + # 情况2:is not None and len = 0,则不启动任何磁盘检测 + # 情况3:len != 0,则取交集 +- if disks_to_detection is None: ++ if disks is None: + logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") + for disk in self._disk_list: +- self._detector_name_list.append(MetricName(disk, "bio", "read", "latency")) +- self._detector_name_list.append(MetricName(disk, "bio", "write", "latency")) +- elif len(disks_to_detection) == 0: +- logging.warning('please attention: conf file not specify any disk to detection, so it will not start ai block io.') ++ for stage in stages: ++ for iotype in iotypes: ++ if disk not in self._detector_name_list: ++ self._detector_name_list[disk] = [] ++ self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) + else: +- for disk_to_detection in disks_to_detection: +- if disk_to_detection in self._disk_list: +- self._detector_name_list.append(MetricName(disk_to_detection, "bio", "read", "latency")) +- self._detector_name_list.append(MetricName(disk_to_detection, "bio", "write", "latency")) ++ for disk in disks: ++ if disk in self._disk_list: ++ for stage in stages: ++ for iotype in iotypes: ++ if disk not in self._detector_name_list: ++ self._detector_name_list[disk] = [] ++ self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) + else: +- logging.warning(f"disk:[{disk_to_detection}] not in available disk list, so it will be ignored.") +- logging.info(f'start to detection follow disk and it\'s metric: {self._detector_name_list}') ++ logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk) ++ if len(self._detector_name_list) == 0: ++ logging.critical("the disks to detection is empty, ai_block_io will exit.") ++ Report.report_pass("the disks to detection is empty, ai_block_io will exit.") ++ exit(1) + + def __init_detector(self): +- train_data_duration, train_update_duration = (self._config_parser. +- get_train_data_duration_and_train_update_duration()) +- slow_io_detection_frequency = self._config_parser.get_slow_io_detect_frequency() +- threshold_type = self._config_parser.get_algorithm_type() +- data_queue_size, update_size = get_data_queue_size_and_update_size(train_data_duration, +- train_update_duration, +- slow_io_detection_frequency) +- sliding_window_type = self._config_parser.get_sliding_window_type() +- window_size, window_threshold = self._config_parser.get_window_size_and_window_minimum_threshold() +- +- for detector_name in self._detector_name_list: +- threshold = ThresholdFactory().get_threshold(threshold_type, +- boxplot_parameter=self._config_parser.get_boxplot_parameter(), +- n_sigma_paramter=self._config_parser.get_n_sigma_parameter(), +- data_queue_size=data_queue_size, +- data_queue_update_size=update_size) +- sliding_window = SlidingWindowFactory().get_sliding_window(sliding_window_type, queue_length=window_size, +- threshold=window_threshold) +- detector = Detector(detector_name, threshold, sliding_window) +- # 绝对阈值的阈值初始化 +- if isinstance(threshold, AbsoluteThreshold): +- threshold.set_threshold(self._config_parser.get_absolute_threshold()) +- self._detectors[detector_name] = detector +- logging.info(f"add detector: {detector}") ++ train_data_duration, train_update_duration = ( ++ self._config_parser.get_train_data_duration_and_train_update_duration() ++ ) ++ slow_io_detection_frequency = self._config_parser.slow_io_detect_frequency ++ threshold_type = self._config_parser.algorithm_type ++ data_queue_size, update_size = get_data_queue_size_and_update_size( ++ train_data_duration, train_update_duration, slow_io_detection_frequency ++ ) ++ sliding_window_type = self._config_parser.sliding_window_type ++ window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold()) ++ ++ for disk, metric_name_list in self._detector_name_list.items(): ++ threshold = ThresholdFactory().get_threshold( ++ threshold_type, ++ boxplot_parameter=self._config_parser.boxplot_parameter, ++ n_sigma_paramter=self._config_parser.n_sigma_parameter, ++ data_queue_size=data_queue_size, ++ data_queue_update_size=update_size, ++ ) ++ sliding_window = SlidingWindowFactory().get_sliding_window( ++ sliding_window_type, ++ queue_length=window_size, ++ threshold=window_threshold, ++ ) ++ disk_detector = DiskDetector(disk) ++ for metric_name in metric_name_list: ++ detector = Detector(metric_name, threshold, sliding_window) ++ disk_detector.add_detector(detector) ++ logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]') ++ self._disk_detectors[disk] = disk_detector + + def launch(self): + while True: +- logging.debug('step0. AI threshold slow io event detection is looping.') ++ logging.debug("step0. AI threshold slow io event detection is looping.") + + # Step1:获取IO数据 + io_data_dict_with_disk_name = get_io_data_from_collect_plug( +- self._config_parser.get_slow_io_detect_frequency(), self._disk_list ++ self._config_parser.slow_io_detect_frequency, self._disk_list + ) +- logging.debug(f'step1. Get io data: {str(io_data_dict_with_disk_name)}') ++ logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") + if io_data_dict_with_disk_name is None: +- Report.report_pass("get io data error, please check if the collector plug is enable. exitting...") ++ Report.report_pass( ++ "get io data error, please check if the collector plug is enable. exitting..." ++ ) + exit(1) + + # Step2:慢IO检测 +- logging.debug('step2. Start to detection slow io event.') ++ logging.debug("step2. Start to detection slow io event.") + slow_io_event_list = [] +- for metric_name, detector in self._detectors.items(): +- result = detector.is_slow_io_event(io_data_dict_with_disk_name) ++ for disk, disk_detector in self._disk_detectors.items(): ++ result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0]: +- slow_io_event_list.append((detector.get_metric_name(), result)) +- logging.debug('step2. End to detection slow io event.') ++ slow_io_event_list.append(result) ++ logging.debug("step2. End to detection slow io event.") + + # Step3:慢IO事件上报 +- logging.debug('step3. Report slow io event to sysSentry.') ++ logging.debug("step3. Report slow io event to sysSentry.") + for slow_io_event in slow_io_event_list: +- metric_name: MetricName = slow_io_event[0] +- result = slow_io_event[1] ++ metric_name: MetricName = slow_io_event[1] + alarm_content = { + "driver_name": f"{metric_name.get_disk_name()}", + "reason": "disk_slow", +@@ -129,14 +145,14 @@ class SlowIODetection: + "io_type": f"{metric_name.get_io_access_type_name()}", + "alarm_source": "ai_block_io", + "alarm_type": "latency", +- "details": f"current window is: {result[1]}, threshold is: {result[2]}.", ++ "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.", + } + Xalarm.major(alarm_content) + logging.warning(alarm_content) + + # Step4:等待检测时间 +- logging.debug('step4. Wait to start next slow io event detection loop.') +- time.sleep(self._config_parser.get_slow_io_detect_frequency()) ++ logging.debug("step4. Wait to start next slow io event detection loop.") ++ time.sleep(self._config_parser.slow_io_detect_frequency) + + + def main(): +diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py +index 354c122..a357766 100644 +--- a/src/python/sentryPlugins/ai_block_io/config_parser.py ++++ b/src/python/sentryPlugins/ai_block_io/config_parser.py +@@ -9,44 +9,60 @@ + # PURPOSE. + # See the Mulan PSL v2 for more details. + ++import os + import configparser + import logging + ++from .alarm_report import Report + from .threshold import ThresholdType + from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level + + + LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" + ++ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio'] ++ALL_IOTPYE_LIST = ['read', 'write'] ++ + + def init_log_format(log_level: str): + logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT) +- if log_level.lower() not in ('info', 'warning', 'error', 'debug'): +- logging.warning(f'the log_level: {log_level} you set is invalid, use default value: info.') ++ if log_level.lower() not in ("info", "warning", "error", "debug"): ++ logging.warning( ++ f"the log_level: {log_level} you set is invalid, use default value: info." ++ ) + + + class ConfigParser: + DEFAULT_ABSOLUTE_THRESHOLD = 40 + DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1 +- DEFAULT_LOG_LEVEL = 'info' ++ DEFAULT_LOG_LEVEL = "info" ++ ++ DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio' ++ DEFAULT_IOTYPE = 'read,write' + +- DEFAULT_ALGORITHM_TYPE = 'boxplot' ++ DEFAULT_ALGORITHM_TYPE = "boxplot" + DEFAULT_TRAIN_DATA_DURATION = 24 + DEFAULT_TRAIN_UPDATE_DURATION = 2 + DEFAULT_BOXPLOT_PARAMETER = 1.5 + DEFAULT_N_SIGMA_PARAMETER = 3 + +- DEFAULT_SLIDING_WINDOW_TYPE = 'not_continuous' ++ DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous" + DEFAULT_WINDOW_SIZE = 30 + DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6 + + def __init__(self, config_file_name): + self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD +- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY ++ self.__slow_io_detect_frequency = ( ++ ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY ++ ) + self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL + self.__disks_to_detection = None ++ self.__stage = ConfigParser.DEFAULT_STAGE ++ self.__iotype = ConfigParser.DEFAULT_IOTYPE + +- self.__algorithm_type = ConfigParser.DEFAULT_ALGORITHM_TYPE ++ self.__algorithm_type = get_threshold_type_enum( ++ ConfigParser.DEFAULT_ALGORITHM_TYPE ++ ) + self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION + self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION + self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER +@@ -58,199 +74,398 @@ class ConfigParser: + + self.__config_file_name = config_file_name + +- def __read_absolute_threshold(self, items_common: dict): ++ def _get_config_value( ++ self, ++ config_items: dict, ++ key: str, ++ value_type, ++ default_value=None, ++ gt=None, ++ ge=None, ++ lt=None, ++ le=None, ++ ): ++ value = config_items.get(key) ++ if value is None: ++ logging.warning( ++ "config of %s not found, the default value %s will be used.", ++ key, ++ default_value, ++ ) ++ value = default_value ++ if not value: ++ logging.critical( ++ "the value of %s is empty, ai_block_io plug will exit.", key ++ ) ++ Report.report_pass( ++ f"the value of {key} is empty, ai_block_io plug will exit." ++ ) ++ exit(1) + try: +- self.__absolute_threshold = float(items_common.get('absolute_threshold', +- ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD)) +- if self.__absolute_threshold <= 0: +- logging.warning( +- f'the_absolute_threshold: {self.__absolute_threshold} you set is invalid, use default value: {ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD}.') +- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD ++ value = value_type(value) + except ValueError: +- self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD +- logging.warning( +- f'the_absolute_threshold type conversion has error, use default value: {self.__absolute_threshold}.') ++ logging.critical( ++ "the value of %s is not a valid %s, ai_block_io plug will exit.", ++ key, ++ value_type, ++ ) ++ Report.report_pass( ++ f"the value of {key} is not a valid {value_type}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if gt is not None and value <= gt: ++ logging.critical( ++ "the value of %s is not greater than %s, ai_block_io plug will exit.", ++ key, ++ gt, ++ ) ++ Report.report_pass( ++ f"the value of {key} is not greater than {gt}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if ge is not None and value < ge: ++ logging.critical( ++ "the value of %s is not greater than or equal to %s, ai_block_io plug will exit.", ++ key, ++ ge, ++ ) ++ Report.report_pass( ++ f"the value of {key} is not greater than or equal to {ge}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if lt is not None and value >= lt: ++ logging.critical( ++ "the value of %s is not less than %s, ai_block_io plug will exit.", ++ key, ++ lt, ++ ) ++ Report.report_pass( ++ f"the value of {key} is not less than {lt}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if le is not None and value > le: ++ logging.critical( ++ "the value of %s is not less than or equal to %s, ai_block_io plug will exit.", ++ key, ++ le, ++ ) ++ Report.report_pass( ++ f"the value of {key} is not less than or equal to {le}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ return value ++ ++ def __read_absolute_threshold(self, items_common: dict): ++ self.__absolute_threshold = self._get_config_value( ++ items_common, ++ "absolute_threshold", ++ float, ++ ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD, ++ gt=0, ++ ) + + def __read__slow_io_detect_frequency(self, items_common: dict): +- try: +- self.__slow_io_detect_frequency = int(items_common.get('slow_io_detect_frequency', +- ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY)) +- if self.__slow_io_detect_frequency < 1 or self.__slow_io_detect_frequency > 10: +- logging.warning( +- f'the slow_io_detect_frequency: {self.__slow_io_detect_frequency} you set is invalid, use default value: {ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY}.') +- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY +- except ValueError: +- self.__slow_io_detect_frequency = ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY +- logging.warning(f'slow_io_detect_frequency type conversion has error, use default value: {self.__slow_io_detect_frequency}.') ++ self.__slow_io_detect_frequency = self._get_config_value( ++ items_common, ++ "slow_io_detect_frequency", ++ int, ++ ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY, ++ gt=0, ++ le=300, ++ ) + + def __read__disks_to_detect(self, items_common: dict): +- disks_to_detection = items_common.get('disk') ++ disks_to_detection = items_common.get("disk") + if disks_to_detection is None: +- logging.warning(f'config of disk not found, the default value will be used.') ++ logging.warning("config of disk not found, the default value will be used.") + self.__disks_to_detection = None + return +- disk_list = disks_to_detection.split(',') +- if len(disk_list) == 0 or (len(disk_list) == 1 and disk_list[0] == ''): +- logging.warning("you don't specify any disk.") +- self.__disks_to_detection = [] +- return +- if len(disk_list) == 1 and disk_list[0] == 'default': ++ disks_to_detection = disks_to_detection.strip() ++ if not disks_to_detection: ++ logging.critical("the value of disk is empty, ai_block_io plug will exit.") ++ Report.report_pass( ++ "the value of disk is empty, ai_block_io plug will exit." ++ ) ++ exit(1) ++ disk_list = disks_to_detection.split(",") ++ if len(disk_list) == 1 and disk_list[0] == "default": + self.__disks_to_detection = None + return + self.__disks_to_detection = disk_list + + def __read__train_data_duration(self, items_algorithm: dict): +- try: +- self.__train_data_duration = float(items_algorithm.get('train_data_duration', +- ConfigParser.DEFAULT_TRAIN_DATA_DURATION)) +- if self.__train_data_duration <= 0 or self.__train_data_duration > 720: +- logging.warning( +- f'the train_data_duration: {self.__train_data_duration} you set is invalid, use default value: {ConfigParser.DEFAULT_TRAIN_DATA_DURATION}.') +- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_DATA_DURATION +- except ValueError: +- self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_DATA_DURATION +- logging.warning(f'the train_data_duration type conversion has error, use default value: {self.__train_data_duration}.') ++ self.__train_data_duration = self._get_config_value( ++ items_algorithm, ++ "train_data_duration", ++ float, ++ ConfigParser.DEFAULT_TRAIN_DATA_DURATION, ++ gt=0, ++ le=720, ++ ) + + def __read__train_update_duration(self, items_algorithm: dict): + default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION + if default_train_update_duration > self.__train_data_duration: + default_train_update_duration = self.__train_data_duration / 2 +- +- try: +- self.__train_update_duration = float(items_algorithm.get('train_update_duration', +- ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION)) +- if self.__train_update_duration <= 0 or self.__train_update_duration > self.__train_data_duration: +- logging.warning( +- f'the train_update_duration: {self.__train_update_duration} you set is invalid, use default value: {default_train_update_duration}.') +- self.__train_update_duration = default_train_update_duration +- except ValueError: +- self.__train_update_duration = default_train_update_duration +- logging.warning(f'the train_update_duration type conversion has error, use default value: {self.__train_update_duration}.') ++ self.__train_update_duration = self._get_config_value( ++ items_algorithm, ++ "train_update_duration", ++ float, ++ default_train_update_duration, ++ gt=0, ++ le=self.__train_data_duration, ++ ) + + def __read__algorithm_type_and_parameter(self, items_algorithm: dict): +- algorithm_type = items_algorithm.get('algorithm_type', ConfigParser.DEFAULT_ALGORITHM_TYPE) ++ algorithm_type = items_algorithm.get( ++ "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE ++ ) + self.__algorithm_type = get_threshold_type_enum(algorithm_type) ++ if self.__algorithm_type is None: ++ logging.critical( ++ "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.", ++ algorithm_type, ++ ) ++ Report.report_pass( ++ f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit." ++ ) ++ exit(1) + + if self.__algorithm_type == ThresholdType.NSigmaThreshold: +- try: +- self.__n_sigma_parameter = float(items_algorithm.get('n_sigma_parameter', +- ConfigParser.DEFAULT_N_SIGMA_PARAMETER)) +- if self.__n_sigma_parameter <= 0 or self.__n_sigma_parameter > 10: +- logging.warning( +- f'the n_sigma_parameter: {self.__n_sigma_parameter} you set is invalid, use default value: {ConfigParser.DEFAULT_N_SIGMA_PARAMETER}.') +- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER +- except ValueError: +- self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER +- logging.warning(f'the n_sigma_parameter type conversion has error, use default value: {self.__n_sigma_parameter}.') ++ self.__n_sigma_parameter = self._get_config_value( ++ items_algorithm, ++ "n_sigma_parameter", ++ float, ++ ConfigParser.DEFAULT_N_SIGMA_PARAMETER, ++ gt=0, ++ le=10, ++ ) + elif self.__algorithm_type == ThresholdType.BoxplotThreshold: +- try: +- self.__boxplot_parameter = float(items_algorithm.get('boxplot_parameter', +- ConfigParser.DEFAULT_BOXPLOT_PARAMETER)) +- if self.__boxplot_parameter <= 0 or self.__boxplot_parameter > 10: +- logging.warning( +- f'the boxplot_parameter: {self.__boxplot_parameter} you set is invalid, use default value: {ConfigParser.DEFAULT_BOXPLOT_PARAMETER}.') +- self.__n_sigma_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER +- except ValueError: +- self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER +- logging.warning(f'the boxplot_parameter type conversion has error, use default value: {self.__boxplot_parameter}.') ++ self.__boxplot_parameter = self._get_config_value( ++ items_algorithm, ++ "boxplot_parameter", ++ float, ++ ConfigParser.DEFAULT_BOXPLOT_PARAMETER, ++ gt=0, ++ le=10, ++ ) ++ ++ def __read__stage(self, items_algorithm: dict): ++ stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE) ++ stage_list = stage_str.split(',') ++ if len(stage_list) == 1 and stage_list[0] == '': ++ logging.critical('stage value not allow is empty, exiting...') ++ exit(1) ++ if len(stage_list) == 1 and stage_list[0] == 'default': ++ logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}') ++ self.__stage = ALL_STAGE_LIST ++ return ++ for stage in stage_list: ++ if stage not in ALL_STAGE_LIST: ++ logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...') ++ exit(1) ++ dup_stage_list = set(stage_list) ++ if 'bio' not in dup_stage_list: ++ logging.critical('stage must contains bio stage, exiting...') ++ exit(1) ++ self.__stage = dup_stage_list ++ ++ def __read__iotype(self, items_algorithm: dict): ++ iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE) ++ iotype_list = iotype_str.split(',') ++ if len(iotype_list) == 1 and iotype_list[0] == '': ++ logging.critical('iotype value not allow is empty, exiting...') ++ exit(1) ++ if len(iotype_list) == 1 and iotype_list[0] == 'default': ++ logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}') ++ self.__iotype = ALL_IOTPYE_LIST ++ return ++ for iotype in iotype_list: ++ if iotype not in ALL_IOTPYE_LIST: ++ logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...') ++ exit(1) ++ dup_iotype_list = set(iotype_list) ++ self.__iotype = dup_iotype_list + + def __read__window_size(self, items_sliding_window: dict): +- try: +- self.__window_size = int(items_sliding_window.get('window_size', +- ConfigParser.DEFAULT_WINDOW_SIZE)) +- if self.__window_size < 1 or self.__window_size > 3600: +- logging.warning( +- f'the window_size: {self.__window_size} you set is invalid, use default value: {ConfigParser.DEFAULT_WINDOW_SIZE}.') +- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE +- except ValueError: +- self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE +- logging.warning(f'window_size type conversion has error, use default value: {self.__window_size}.') ++ self.__window_size = self._get_config_value( ++ items_sliding_window, ++ "window_size", ++ int, ++ ConfigParser.DEFAULT_WINDOW_SIZE, ++ gt=0, ++ le=3600, ++ ) + + def __read__window_minimum_threshold(self, items_sliding_window: dict): + default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD + if default_window_minimum_threshold > self.__window_size: + default_window_minimum_threshold = self.__window_size / 2 +- try: +- self.__window_minimum_threshold = ( +- int(items_sliding_window.get('window_minimum_threshold', +- ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD))) +- if self.__window_minimum_threshold < 1 or self.__window_minimum_threshold > self.__window_size: +- logging.warning( +- f'the window_minimum_threshold: {self.__window_minimum_threshold} you set is invalid, use default value: {default_window_minimum_threshold}.') +- self.__window_minimum_threshold = default_window_minimum_threshold +- except ValueError: +- self.__window_minimum_threshold = default_window_minimum_threshold +- logging.warning(f'window_minimum_threshold type conversion has error, use default value: {self.__window_minimum_threshold}.') ++ self.__window_minimum_threshold = self._get_config_value( ++ items_sliding_window, ++ "window_minimum_threshold", ++ int, ++ default_window_minimum_threshold, ++ gt=0, ++ le=self.__window_size, ++ ) + + def read_config_from_file(self): ++ if not os.path.exists(self.__config_file_name): ++ init_log_format(self.__log_level) ++ logging.critical( ++ "config file %s not found, ai_block_io plug will exit.", ++ self.__config_file_name, ++ ) ++ Report.report_pass( ++ f"config file {self.__config_file_name} not found, ai_block_io plug will exit." ++ ) ++ exit(1) ++ + con = configparser.ConfigParser() + try: +- con.read(self.__config_file_name, encoding='utf-8') ++ con.read(self.__config_file_name, encoding="utf-8") + except configparser.Error as e: + init_log_format(self.__log_level) +- logging.critical(f'config file read error: {e}, ai_block_io plug will exit.') ++ logging.critical( ++ f"config file read error: %s, ai_block_io plug will exit.", e ++ ) ++ Report.report_pass( ++ f"config file read error: {e}, ai_block_io plug will exit." ++ ) + exit(1) + +- if con.has_section('common'): +- items_common = dict(con.items('common')) +- self.__log_level = items_common.get('log_level', ConfigParser.DEFAULT_LOG_LEVEL) ++ if con.has_section('log'): ++ items_log = dict(con.items('log')) ++ # 情况一:没有log,则使用默认值 ++ # 情况二:有log,值为空或异常,使用默认值 ++ # 情况三:有log,值正常,则使用该值 ++ self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL) + init_log_format(self.__log_level) ++ else: ++ init_log_format(self.__log_level) ++ logging.warning(f"log section parameter not found, it will be set to default value.") ++ ++ if con.has_section("common"): ++ items_common = dict(con.items("common")) + self.__read_absolute_threshold(items_common) + self.__read__slow_io_detect_frequency(items_common) + self.__read__disks_to_detect(items_common) ++ self.__read__stage(items_common) ++ self.__read__iotype(items_common) + else: +- init_log_format(self.__log_level) +- logging.warning("common section parameter not found, it will be set to default value.") ++ logging.warning( ++ "common section parameter not found, it will be set to default value." ++ ) + +- if con.has_section('algorithm'): +- items_algorithm = dict(con.items('algorithm')) ++ if con.has_section("algorithm"): ++ items_algorithm = dict(con.items("algorithm")) + self.__read__train_data_duration(items_algorithm) + self.__read__train_update_duration(items_algorithm) + self.__read__algorithm_type_and_parameter(items_algorithm) + else: +- logging.warning("algorithm section parameter not found, it will be set to default value.") +- +- if con.has_section('sliding_window'): +- items_sliding_window = dict(con.items('sliding_window')) +- sliding_window_type = items_sliding_window.get('sliding_window_type', +- ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE) +- self.__sliding_window_type = get_sliding_window_type_enum(sliding_window_type) ++ logging.warning( ++ "algorithm section parameter not found, it will be set to default value." ++ ) ++ ++ if con.has_section("sliding_window"): ++ items_sliding_window = dict(con.items("sliding_window")) ++ sliding_window_type = items_sliding_window.get( ++ "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE ++ ) ++ self.__sliding_window_type = get_sliding_window_type_enum( ++ sliding_window_type ++ ) + self.__read__window_size(items_sliding_window) + self.__read__window_minimum_threshold(items_sliding_window) + else: +- logging.warning("sliding_window section parameter not found, it will be set to default value.") ++ logging.warning( ++ "sliding_window section parameter not found, it will be set to default value." ++ ) + + self.__print_all_config_value() + ++ def __repr__(self): ++ config_str = { ++ 'log.level': self.__log_level, ++ 'common.absolute_threshold': self.__absolute_threshold, ++ 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency, ++ 'common.disk': self.__disks_to_detection, ++ 'common.stage': self.__stage, ++ 'common.iotype': self.__iotype, ++ 'algorithm.train_data_duration': self.__train_data_duration, ++ 'algorithm.train_update_duration': self.__train_update_duration, ++ 'algorithm.algorithm_type': self.__algorithm_type, ++ 'algorithm.boxplot_parameter': self.__boxplot_parameter, ++ 'algorithm.n_sigma_parameter': self.__n_sigma_parameter, ++ 'sliding_window.sliding_window_type': self.__sliding_window_type, ++ 'sliding_window.window_size': self.__window_size, ++ 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold ++ } ++ return str(config_str) ++ + def __print_all_config_value(self): +- pass ++ logging.info(f"all config is follow:\n {self}") ++ ++ def get_train_data_duration_and_train_update_duration(self): ++ return self.__train_data_duration, self.__train_update_duration + +- def get_slow_io_detect_frequency(self): ++ def get_window_size_and_window_minimum_threshold(self): ++ return self.__window_size, self.__window_minimum_threshold ++ ++ @property ++ def slow_io_detect_frequency(self): + return self.__slow_io_detect_frequency + +- def get_algorithm_type(self): ++ @property ++ def algorithm_type(self): + return self.__algorithm_type + +- def get_sliding_window_type(self): ++ @property ++ def sliding_window_type(self): + return self.__sliding_window_type + +- def get_train_data_duration_and_train_update_duration(self): +- return self.__train_data_duration, self.__train_update_duration ++ @property ++ def train_data_duration(self): ++ return self.__train_data_duration + +- def get_window_size_and_window_minimum_threshold(self): +- return self.__window_size, self.__window_minimum_threshold ++ @property ++ def train_update_duration(self): ++ return self.__train_update_duration ++ ++ @property ++ def window_size(self): ++ return self.__window_size + +- def get_absolute_threshold(self): ++ @property ++ def window_minimum_threshold(self): ++ return self.__window_minimum_threshold ++ ++ @property ++ def absolute_threshold(self): + return self.__absolute_threshold + +- def get_log_level(self): ++ @property ++ def log_level(self): + return self.__log_level + +- def get_disks_to_detection(self): ++ @property ++ def disks_to_detection(self): + return self.__disks_to_detection + +- def get_boxplot_parameter(self): ++ @property ++ def stage(self): ++ return self.__stage ++ ++ @property ++ def iotype(self): ++ return self.__iotype ++ ++ @property ++ def boxplot_parameter(self): + return self.__boxplot_parameter + +- def get_n_sigma_parameter(self): ++ @property ++ def n_sigma_parameter(self): + return self.__n_sigma_parameter +diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py +index c7679cd..ed997e6 100644 +--- a/src/python/sentryPlugins/ai_block_io/data_access.py ++++ b/src/python/sentryPlugins/ai_block_io/data_access.py +@@ -41,11 +41,14 @@ def check_collect_valid(period): + try: + data = json.loads(data_raw["message"]) + except Exception as e: +- logging.warning(f"get io data failed, {e}") ++ logging.warning(f"get valid devices failed, occur exception: {e}") ++ return None ++ if not data: ++ logging.warning(f"get valid devices failed, return {data_raw}") + return None + return [k for k in data.keys()] + else: +- logging.warning(f"get io data failed, return {data_raw}") ++ logging.warning(f"get valid devices failed, return {data_raw}") + return None + + +@@ -60,7 +63,7 @@ def _get_raw_data(period, disk_list): + + def _get_io_stage_data(data): + io_stage_data = IOStageData() +- for data_type in ('read', 'write', 'flush', 'discard'): ++ for data_type in ("read", "write", "flush", "discard"): + if data_type in data: + getattr(io_stage_data, data_type).latency = data[data_type][0] + getattr(io_stage_data, data_type).io_dump = data[data_type][1] +@@ -87,7 +90,7 @@ def get_io_data_from_collect_plug(period, disk_list): + getattr(disk_ret, k) + setattr(disk_ret, k, _get_io_stage_data(v)) + except AttributeError: +- logging.debug(f'no attr {k}') ++ logging.debug(f"no attr {k}") + continue + ret[disk] = disk_ret + return ret +diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py +index 0ed282b..e710ddd 100644 +--- a/src/python/sentryPlugins/ai_block_io/detector.py ++++ b/src/python/sentryPlugins/ai_block_io/detector.py +@@ -53,3 +53,28 @@ class Detector: + f' io_type_name: {self._metric_name.get_io_access_type_name()},' + f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},' + f' sliding_window_type: {self._slidingWindow}') ++ ++ ++class DiskDetector: ++ ++ def __init__(self, disk_name: str): ++ self._disk_name = disk_name ++ self._detector_list = [] ++ ++ def add_detector(self, detector: Detector): ++ self._detector_list.append(detector) ++ ++ def is_slow_io_event(self, io_data_dict_with_disk_name: dict): ++ # 只有bio阶段发生异常,就认为发生了慢IO事件 ++ # todo:根因诊断 ++ for detector in self._detector_list: ++ result = detector.is_slow_io_event(io_data_dict_with_disk_name) ++ if result[0] and detector.get_metric_name().get_stage_name() == 'bio': ++ return result[0], detector.get_metric_name(), result[1], result[2] ++ return False, None, None, None ++ ++ def __repr__(self): ++ msg = f'disk: {self._disk_name}, ' ++ for detector in self._detector_list: ++ msg += f'\n detector: [{detector}]' ++ return msg +diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py +index 8dbba06..0ed37b9 100644 +--- a/src/python/sentryPlugins/ai_block_io/utils.py ++++ b/src/python/sentryPlugins/ai_block_io/utils.py +@@ -25,8 +25,7 @@ def get_threshold_type_enum(algorithm_type: str): + return ThresholdType.BoxplotThreshold + if algorithm_type.lower() == 'n_sigma': + return ThresholdType.NSigmaThreshold +- logging.warning(f"the algorithm type: {algorithm_type} you set is invalid, use default value: boxplot") +- return ThresholdType.BoxplotThreshold ++ return None + + + def get_sliding_window_type_enum(sliding_window_type: str): +-- +2.23.0 + diff --git a/sysSentry.spec b/sysSentry.spec index 7776e90..cc98a8e 100644 --- a/sysSentry.spec +++ b/sysSentry.spec @@ -4,7 +4,7 @@ Summary: System Inspection Framework Name: sysSentry Version: 1.0.2 -Release: 35 +Release: 36 License: Mulan PSL v2 Group: System Environment/Daemons Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz @@ -50,6 +50,7 @@ Patch37: fix-xalarm_Report-function-not-refuse-alarm-msg-exce.patch Patch38: fix-xalarm_upgrade-not-return-val-and-fail-when-thre.patch Patch39: add-log-for-xalarm-when-sending-msg-and-clean-invali.patch Patch40: add-xalarm-cleanup-invalid-server-socket-peroidly.patch +Patch41: ai_block_io-support-stage-and-iotype.patch BuildRequires: cmake gcc-c++ BuildRequires: python3 python3-setuptools @@ -304,6 +305,12 @@ rm -rf %{buildroot} %attr(0550,root,root) %{python3_sitelib}/sentryPlugins/ai_block_io %changelog +* Fri Oct 11 2024 heyouzhi - 1.0.2-36 +- Type:requirement +- CVE:NA +- SUG:NA +- DESC:ai_block_io support stage and iotype + * Fri Oct 11 2024 caixiaomeng - 1.0.2-35 - Type:bugfix - CVE:NA -- Gitee