From 23386b63ade7a2bc6b06513de70e8a5a1335d845 Mon Sep 17 00:00:00 2001 From: jungheil Date: Sat, 12 Oct 2024 14:55:24 +0800 Subject: [PATCH] feat: add abs threshold by disk type --- .../config/plugins/ai_block_io.ini | 15 +- .../sentryPlugins/ai_block_io/ai_block_io.py | 126 +++-- .../ai_block_io/config_parser.py | 465 +++++++++++------- .../sentryPlugins/ai_block_io/data_access.py | 1 + .../sentryPlugins/ai_block_io/detector.py | 14 +- .../sentryPlugins/ai_block_io/io_data.py | 32 +- .../ai_block_io/sliding_window.py | 49 +- .../python/sentryPlugins/ai_block_io/utils.py | 44 +- 8 files changed, 450 insertions(+), 296 deletions(-) diff --git a/sysSentry-1.0.2/config/plugins/ai_block_io.ini b/sysSentry-1.0.2/config/plugins/ai_block_io.ini index ce636d5..5731c02 100644 --- a/sysSentry-1.0.2/config/plugins/ai_block_io.ini +++ b/sysSentry-1.0.2/config/plugins/ai_block_io.ini @@ -2,7 +2,6 @@ level=info [common] -absolute_threshold=40 slow_io_detect_frequency=1 disk=default stage=bio @@ -18,4 +17,16 @@ n_sigma_parameter=3 [sliding_window] sliding_window_type=not_continuous window_size=30 -window_minimum_threshold=6 \ No newline at end of file +window_minimum_threshold=6 + +[latency_sata_ssd] +read_tot_lim=50000 +write_tot_lim=50000 + +[latency_nvme_ssd] +read_tot_lim=500 +write_tot_lim=500 + +[latency_sata_hdd] +read_tot_lim=50000 +write_tot_lim=50000 \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py index e1052ec..2672f1d 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/ai_block_io.py @@ -12,13 +12,18 @@ import time import signal import logging +from collections import defaultdict from .detector import Detector, DiskDetector from .threshold import ThresholdFactory, AbsoluteThreshold from .sliding_window import SlidingWindowFactory from .utils import get_data_queue_size_and_update_size from .config_parser import ConfigParser -from .data_access import get_io_data_from_collect_plug, check_collect_valid +from .data_access import ( + get_io_data_from_collect_plug, + check_collect_valid, + get_disk_type, +) from .io_data import MetricName from .alarm_report import Xalarm, Report @@ -34,7 +39,7 @@ def sig_handler(signum, frame): class SlowIODetection: _config_parser = None _disk_list = None - _detector_name_list = {} + _detector_name_list = defaultdict(list) _disk_detectors = {} def __init__(self, config_parser: ConfigParser): @@ -43,9 +48,13 @@ class SlowIODetection: self.__init_detector() def __init_detector_name_list(self): - self._disk_list = check_collect_valid(self._config_parser.slow_io_detect_frequency) + self._disk_list = check_collect_valid( + self._config_parser.slow_io_detect_frequency + ) if self._disk_list is None: - Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") + Report.report_pass( + "get available disk error, please check if the collector plug is enable. exiting..." + ) exit(1) logging.info(f"ai_block_io plug has found disks: {self._disk_list}") @@ -56,27 +65,45 @@ class SlowIODetection: # 情况2:is not None and len = 0,则不启动任何磁盘检测 # 情况3:len != 0,则取交集 if disks is None: - logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") - for disk in self._disk_list: - for stage in stages: - for iotype in iotypes: - if disk not in self._detector_name_list: - self._detector_name_list[disk] = [] - self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) - else: - for disk in disks: - if disk in self._disk_list: - for stage in stages: - for iotype in iotypes: - if disk not in self._detector_name_list: - self._detector_name_list[disk] = [] - self._detector_name_list[disk].append(MetricName(disk, stage, iotype, "latency")) - else: - logging.warning("disk: [%s] not in available disk list, so it will be ignored.", disk) - if len(self._detector_name_list) == 0: - logging.critical("the disks to detection is empty, ai_block_io will exit.") - Report.report_pass("the disks to detection is empty, ai_block_io will exit.") - exit(1) + logging.warning( + "you not specify any disk or use default, so ai_block_io will enable all available disk." + ) + for disk in self._disk_list: + if disks is not None: + if disk not in disks: + continue + disks.remove(disk) + + disk_type_result = get_disk_type(disk) + if disk_type_result["ret"] == 0 and disk_type_result["message"] in ( + '0', + '1', + '2', + ): + disk_type = int(disk_type_result["message"]) + else: + logging.warning( + "%s get disk type error, return %s, so it will be ignored.", + disk, + disk_type_result, + ) + continue + for stage in stages: + for iotype in iotypes: + self._detector_name_list[disk].append( + MetricName(disk, disk_type, stage, iotype, "latency") + ) + if disks: + logging.warning( + "disks: %s not in available disk list, so they will be ignored.", + disks, + ) + if not self._detector_name_list: + logging.critical("the disks to detection is empty, ai_block_io will exit.") + Report.report_pass( + "the disks to detection is empty, ai_block_io will exit." + ) + exit(1) def __init_detector(self): train_data_duration, train_update_duration = ( @@ -88,26 +115,39 @@ class SlowIODetection: train_data_duration, train_update_duration, slow_io_detection_frequency ) sliding_window_type = self._config_parser.sliding_window_type - window_size, window_threshold = (self._config_parser.get_window_size_and_window_minimum_threshold()) + window_size, window_threshold = ( + self._config_parser.get_window_size_and_window_minimum_threshold() + ) for disk, metric_name_list in self._detector_name_list.items(): - threshold = ThresholdFactory().get_threshold( - threshold_type, - boxplot_parameter=self._config_parser.boxplot_parameter, - n_sigma_paramter=self._config_parser.n_sigma_parameter, - data_queue_size=data_queue_size, - data_queue_update_size=update_size, - ) - sliding_window = SlidingWindowFactory().get_sliding_window( - sliding_window_type, - queue_length=window_size, - threshold=window_threshold, - ) disk_detector = DiskDetector(disk) for metric_name in metric_name_list: + threshold = ThresholdFactory().get_threshold( + threshold_type, + boxplot_parameter=self._config_parser.boxplot_parameter, + n_sigma_paramter=self._config_parser.n_sigma_parameter, + data_queue_size=data_queue_size, + data_queue_update_size=update_size, + ) + abs_threshold = self._config_parser.get_tot_lim( + metric_name.disk_type, metric_name.io_access_type_name + ) + if abs_threshold is None: + logging.warning( + "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.", + disk, + metric_name.disk_type, + metric_name.io_access_type_name, + ) + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, + threshold=window_threshold, + abs_threshold=abs_threshold, + ) detector = Detector(metric_name, threshold, sliding_window) disk_detector.add_detector(detector) - logging.info(f'disk: [{disk}] add detector:\n [{disk_detector}]') + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") self._disk_detectors[disk] = disk_detector def launch(self): @@ -139,13 +179,13 @@ class SlowIODetection: for slow_io_event in slow_io_event_list: metric_name: MetricName = slow_io_event[1] alarm_content = { - "driver_name": f"{metric_name.get_disk_name()}", + "driver_name": f"{metric_name.disk_name}", "reason": "disk_slow", - "block_stack": f"{metric_name.get_stage_name()}", - "io_type": f"{metric_name.get_io_access_type_name()}", + "block_stack": f"{metric_name.stage_name}", + "io_type": f"{metric_name.io_access_type_name}", "alarm_source": "ai_block_io", "alarm_type": "latency", - "details": f"current window is: {slow_io_event[2]}, threshold is: {slow_io_event[3]}.", + "details": f"disk type: {metric_name.disk_type}, ai threshold: {slow_io_event[3]}, abs threshold: {slow_io_event[4]},current window: {slow_io_event[2]}.", } Xalarm.major(alarm_content) logging.warning(alarm_content) diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py index a357766..3388cd4 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/config_parser.py @@ -20,59 +20,62 @@ from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_lo LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" -ALL_STAGE_LIST = ['throtl', 'wbt', 'gettag', 'plug', 'deadline', 'hctx', 'requeue', 'rq_driver', 'bio'] -ALL_IOTPYE_LIST = ['read', 'write'] +ALL_STAGE_LIST = [ + "throtl", + "wbt", + "gettag", + "plug", + "deadline", + "hctx", + "requeue", + "rq_driver", + "bio", +] +ALL_IOTPYE_LIST = ["read", "write"] +DISK_TYPE_MAP = { + 0: "nvme_ssd", + 1: "sata_ssd", + 2: "sata_hdd", +} def init_log_format(log_level: str): logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT) if log_level.lower() not in ("info", "warning", "error", "debug"): logging.warning( - f"the log_level: {log_level} you set is invalid, use default value: info." + "the log_level: %s you set is invalid, use default value: info.", log_level ) class ConfigParser: - DEFAULT_ABSOLUTE_THRESHOLD = 40 - DEFAULT_SLOW_IO_DETECTION_FREQUENCY = 1 - DEFAULT_LOG_LEVEL = "info" - - DEFAULT_STAGE = 'throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio' - DEFAULT_IOTYPE = 'read,write' - - DEFAULT_ALGORITHM_TYPE = "boxplot" - DEFAULT_TRAIN_DATA_DURATION = 24 - DEFAULT_TRAIN_UPDATE_DURATION = 2 - DEFAULT_BOXPLOT_PARAMETER = 1.5 - DEFAULT_N_SIGMA_PARAMETER = 3 - - DEFAULT_SLIDING_WINDOW_TYPE = "not_continuous" - DEFAULT_WINDOW_SIZE = 30 - DEFAULT_WINDOW_MINIMUM_THRESHOLD = 6 + DEFAULT_CONF = { + "log": {"level": "info"}, + "common": { + "slow_io_detect_frequency": 1, + "disk": None, + "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio", + "iotype": "read,write", + }, + "algorithm": { + "train_data_duration": 24.0, + "train_update_duration": 2.0, + "algorithm_type": get_threshold_type_enum("boxplot"), + "boxplot_parameter": 1.5, + "n_sigma_parameter": 3.0, + }, + "sliding_window": { + "sliding_window_type": get_sliding_window_type_enum("not_continuous"), + "window_size": 30, + "window_minimum_threshold": 6, + }, + "latency_sata_ssd": {"read_tot_lim": 50000, "write_tot_lim": 50000}, + "latency_nvme_ssd": {"read_tot_lim": 500, "write_tot_lim": 500}, + "latency_sata_hdd": {"read_tot_lim": 50000, "write_tot_lim": 50000}, + } def __init__(self, config_file_name): - self.__absolute_threshold = ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD - self.__slow_io_detect_frequency = ( - ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY - ) - self.__log_level = ConfigParser.DEFAULT_LOG_LEVEL - self.__disks_to_detection = None - self.__stage = ConfigParser.DEFAULT_STAGE - self.__iotype = ConfigParser.DEFAULT_IOTYPE - - self.__algorithm_type = get_threshold_type_enum( - ConfigParser.DEFAULT_ALGORITHM_TYPE - ) - self.__train_data_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION - self.__train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION - self.__boxplot_parameter = ConfigParser.DEFAULT_BOXPLOT_PARAMETER - self.__n_sigma_parameter = ConfigParser.DEFAULT_N_SIGMA_PARAMETER - - self.__sliding_window_type = ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE - self.__window_size = ConfigParser.DEFAULT_WINDOW_SIZE - self.__window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD - - self.__config_file_name = config_file_name + self._conf = ConfigParser.DEFAULT_CONF + self._config_file_name = config_file_name def _get_config_value( self, @@ -156,30 +159,21 @@ class ConfigParser: return value - def __read_absolute_threshold(self, items_common: dict): - self.__absolute_threshold = self._get_config_value( - items_common, - "absolute_threshold", - float, - ConfigParser.DEFAULT_ABSOLUTE_THRESHOLD, - gt=0, - ) - - def __read__slow_io_detect_frequency(self, items_common: dict): - self.__slow_io_detect_frequency = self._get_config_value( + def _read_slow_io_detect_frequency(self, items_common: dict): + self._conf["common"]["slow_io_detect_frequency"] = self._get_config_value( items_common, "slow_io_detect_frequency", int, - ConfigParser.DEFAULT_SLOW_IO_DETECTION_FREQUENCY, + self.DEFAULT_CONF["common"]["slow_io_detect_frequency"], gt=0, le=300, ) - def __read__disks_to_detect(self, items_common: dict): + def _read_disks_to_detect(self, items_common: dict): disks_to_detection = items_common.get("disk") if disks_to_detection is None: logging.warning("config of disk not found, the default value will be used.") - self.__disks_to_detection = None + self._conf["common"]["disk"] = None return disks_to_detection = disks_to_detection.strip() if not disks_to_detection: @@ -189,40 +183,46 @@ class ConfigParser: ) exit(1) disk_list = disks_to_detection.split(",") + disk_list = [disk.strip() for disk in disk_list] if len(disk_list) == 1 and disk_list[0] == "default": - self.__disks_to_detection = None + self._conf["common"]["disk"] = None return - self.__disks_to_detection = disk_list + self._conf["common"]["disk"] = disk_list - def __read__train_data_duration(self, items_algorithm: dict): - self.__train_data_duration = self._get_config_value( + def _read_train_data_duration(self, items_algorithm: dict): + self._conf["common"]["train_data_duration"] = self._get_config_value( items_algorithm, "train_data_duration", float, - ConfigParser.DEFAULT_TRAIN_DATA_DURATION, + self.DEFAULT_CONF["algorithm"]["train_data_duration"], gt=0, le=720, ) - def __read__train_update_duration(self, items_algorithm: dict): - default_train_update_duration = ConfigParser.DEFAULT_TRAIN_UPDATE_DURATION - if default_train_update_duration > self.__train_data_duration: - default_train_update_duration = self.__train_data_duration / 2 - self.__train_update_duration = self._get_config_value( + def _read_train_update_duration(self, items_algorithm: dict): + default_train_update_duration = self.DEFAULT_CONF["algorithm"][ + "train_update_duration" + ] + if default_train_update_duration > self._conf["common"]["train_data_duration"]: + default_train_update_duration = ( + self._conf["common"]["train_data_duration"] / 2 + ) + self._conf["common"]["train_update_duration"] = self._get_config_value( items_algorithm, "train_update_duration", float, default_train_update_duration, gt=0, - le=self.__train_data_duration, + le=self._conf["common"]["train_data_duration"], ) - def __read__algorithm_type_and_parameter(self, items_algorithm: dict): - algorithm_type = items_algorithm.get( - "algorithm_type", ConfigParser.DEFAULT_ALGORITHM_TYPE - ) - self.__algorithm_type = get_threshold_type_enum(algorithm_type) - if self.__algorithm_type is None: + def _read_algorithm_type_and_parameter(self, items_algorithm: dict): + algorithm_type = items_algorithm.get("algorithm_type") + if algorithm_type is not None: + self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum( + algorithm_type + ) + if self._conf["algorithm"]["algorithm_type"] is None: logging.critical( "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.", algorithm_type, @@ -231,129 +231,175 @@ class ConfigParser: f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit." ) exit(1) - - if self.__algorithm_type == ThresholdType.NSigmaThreshold: - self.__n_sigma_parameter = self._get_config_value( + elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold: + self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value( items_algorithm, "n_sigma_parameter", float, - ConfigParser.DEFAULT_N_SIGMA_PARAMETER, + self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"], gt=0, le=10, ) - elif self.__algorithm_type == ThresholdType.BoxplotThreshold: - self.__boxplot_parameter = self._get_config_value( + elif ( + self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold + ): + self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value( items_algorithm, "boxplot_parameter", float, - ConfigParser.DEFAULT_BOXPLOT_PARAMETER, + self.DEFAULT_CONF["algorithm"]["boxplot_parameter"], gt=0, le=10, ) - def __read__stage(self, items_algorithm: dict): - stage_str = items_algorithm.get('stage', ConfigParser.DEFAULT_STAGE) - stage_list = stage_str.split(',') - if len(stage_list) == 1 and stage_list[0] == '': - logging.critical('stage value not allow is empty, exiting...') + def _read_stage(self, items_algorithm: dict): + stage_str = items_algorithm.get( + "stage", self.DEFAULT_CONF["common"]["stage"] + ).strip() + stage_list = stage_str.split(",") + stage_list = [stage.strip() for stage in stage_list] + if len(stage_list) == 1 and stage_list[0] == "": + logging.critical("stage value not allow is empty, exiting...") exit(1) - if len(stage_list) == 1 and stage_list[0] == 'default': - logging.warning(f'stage will enable default value: {ConfigParser.DEFAULT_STAGE}') - self.__stage = ALL_STAGE_LIST + if len(stage_list) == 1 and stage_list[0] == "default": + logging.warning( + "stage will enable default value: %s", + self.DEFAULT_CONF["common"]["stage"], + ) + self._conf["common"]["stage"] = ALL_STAGE_LIST return for stage in stage_list: if stage not in ALL_STAGE_LIST: - logging.critical(f'stage: {stage} is not valid stage, ai_block_io will exit...') + logging.critical( + "stage: %s is not valid stage, ai_block_io will exit...", stage + ) exit(1) dup_stage_list = set(stage_list) - if 'bio' not in dup_stage_list: - logging.critical('stage must contains bio stage, exiting...') + if "bio" not in dup_stage_list: + logging.critical("stage must contains bio stage, exiting...") exit(1) - self.__stage = dup_stage_list - - def __read__iotype(self, items_algorithm: dict): - iotype_str = items_algorithm.get('iotype', ConfigParser.DEFAULT_IOTYPE) - iotype_list = iotype_str.split(',') - if len(iotype_list) == 1 and iotype_list[0] == '': - logging.critical('iotype value not allow is empty, exiting...') + self._conf["common"]["stage"] = dup_stage_list + + def _read_iotype(self, items_algorithm: dict): + iotype_str = items_algorithm.get( + "iotype", self.DEFAULT_CONF["common"]["iotype"] + ).strip() + iotype_list = iotype_str.split(",") + iotype_list = [iotype.strip() for iotype in iotype_list] + if len(iotype_list) == 1 and iotype_list[0] == "": + logging.critical("iotype value not allow is empty, exiting...") exit(1) - if len(iotype_list) == 1 and iotype_list[0] == 'default': - logging.warning(f'iotype will enable default value: {ConfigParser.DEFAULT_IOTYPE}') - self.__iotype = ALL_IOTPYE_LIST + if len(iotype_list) == 1 and iotype_list[0] == "default": + logging.warning( + "iotype will enable default value: %s", + self.DEFAULT_CONF["common"]["iotype"], + ) + self._conf["common"]["iotype"] = ALL_IOTPYE_LIST return for iotype in iotype_list: if iotype not in ALL_IOTPYE_LIST: - logging.critical(f'iotype: {iotype} is not valid iotype, ai_block_io will exit...') + logging.critical( + "iotype: %s is not valid iotype, ai_block_io will exit...", iotype + ) exit(1) dup_iotype_list = set(iotype_list) - self.__iotype = dup_iotype_list + self._conf["common"]["iotype"] = dup_iotype_list + + def _read_sliding_window_type(self, items_sliding_window: dict): + sliding_window_type = items_sliding_window.get("sliding_window_type") + if sliding_window_type is not None: + self._conf["sliding_window"]["sliding_window_type"] = ( + get_sliding_window_type_enum(sliding_window_type) + ) + if self._conf["sliding_window"]["sliding_window_type"] is None: + logging.critical( + "the sliding_window_type: %s you set is invalid. ai_block_io plug will exit.", + sliding_window_type, + ) + Report.report_pass( + f"the sliding_window_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit." + ) + exit(1) - def __read__window_size(self, items_sliding_window: dict): - self.__window_size = self._get_config_value( + def _read_window_size(self, items_sliding_window: dict): + self._conf["sliding_window"]["window_size"] = self._get_config_value( items_sliding_window, "window_size", int, - ConfigParser.DEFAULT_WINDOW_SIZE, + self.DEFAULT_CONF["sliding_window"]["window_size"], gt=0, le=3600, ) - def __read__window_minimum_threshold(self, items_sliding_window: dict): - default_window_minimum_threshold = ConfigParser.DEFAULT_WINDOW_MINIMUM_THRESHOLD - if default_window_minimum_threshold > self.__window_size: - default_window_minimum_threshold = self.__window_size / 2 - self.__window_minimum_threshold = self._get_config_value( - items_sliding_window, - "window_minimum_threshold", - int, - default_window_minimum_threshold, - gt=0, - le=self.__window_size, + def _read_window_minimum_threshold(self, items_sliding_window: dict): + default_window_minimum_threshold = self.DEFAULT_CONF["sliding_window"][ + "window_minimum_threshold" + ] + if ( + default_window_minimum_threshold + > self._conf["sliding_window"]["window_size"] + ): + default_window_minimum_threshold = ( + self._conf["sliding_window"]["window_size"] / 2 + ) + self._conf["sliding_window"]["window_minimum_threshold"] = ( + self._get_config_value( + items_sliding_window, + "window_minimum_threshold", + int, + default_window_minimum_threshold, + gt=0, + le=self._conf["sliding_window"]["window_size"], + ) ) def read_config_from_file(self): - if not os.path.exists(self.__config_file_name): - init_log_format(self.__log_level) + if not os.path.exists(self._config_file_name): + init_log_format(self._conf["log"]["level"]) logging.critical( "config file %s not found, ai_block_io plug will exit.", - self.__config_file_name, + self._config_file_name, ) Report.report_pass( - f"config file {self.__config_file_name} not found, ai_block_io plug will exit." + f"config file {self._config_file_name} not found, ai_block_io plug will exit." ) exit(1) con = configparser.ConfigParser() try: - con.read(self.__config_file_name, encoding="utf-8") + con.read(self._config_file_name, encoding="utf-8") except configparser.Error as e: - init_log_format(self.__log_level) + init_log_format(self._conf["log"]["level"]) logging.critical( - f"config file read error: %s, ai_block_io plug will exit.", e + "config file read error: %s, ai_block_io plug will exit.", e ) Report.report_pass( f"config file read error: {e}, ai_block_io plug will exit." ) exit(1) - if con.has_section('log'): - items_log = dict(con.items('log')) + if con.has_section("log"): + items_log = dict(con.items("log")) # 情况一:没有log,则使用默认值 # 情况二:有log,值为空或异常,使用默认值 # 情况三:有log,值正常,则使用该值 - self.__log_level = items_log.get('level', ConfigParser.DEFAULT_LOG_LEVEL) - init_log_format(self.__log_level) + self._conf["log"]["level"] = items_log.get( + "level", self.DEFAULT_CONF["log"]["level"] + ) + init_log_format(self._conf["log"]["level"]) else: - init_log_format(self.__log_level) - logging.warning(f"log section parameter not found, it will be set to default value.") + init_log_format(self._conf["log"]["level"]) + logging.warning( + "log section parameter not found, it will be set to default value." + ) if con.has_section("common"): items_common = dict(con.items("common")) - self.__read_absolute_threshold(items_common) - self.__read__slow_io_detect_frequency(items_common) - self.__read__disks_to_detect(items_common) - self.__read__stage(items_common) - self.__read__iotype(items_common) + + self._read_slow_io_detect_frequency(items_common) + self._read_disks_to_detect(items_common) + self._read_stage(items_common) + self._read_iotype(items_common) else: logging.warning( "common section parameter not found, it will be set to default value." @@ -361,9 +407,9 @@ class ConfigParser: if con.has_section("algorithm"): items_algorithm = dict(con.items("algorithm")) - self.__read__train_data_duration(items_algorithm) - self.__read__train_update_duration(items_algorithm) - self.__read__algorithm_type_and_parameter(items_algorithm) + self._read_train_data_duration(items_algorithm) + self._read_train_update_duration(items_algorithm) + self._read_algorithm_type_and_parameter(items_algorithm) else: logging.warning( "algorithm section parameter not found, it will be set to default value." @@ -371,101 +417,162 @@ class ConfigParser: if con.has_section("sliding_window"): items_sliding_window = dict(con.items("sliding_window")) - sliding_window_type = items_sliding_window.get( - "sliding_window_type", ConfigParser.DEFAULT_SLIDING_WINDOW_TYPE + + self._read_window_size(items_sliding_window) + self._read_window_minimum_threshold(items_sliding_window) + else: + logging.warning( + "sliding_window section parameter not found, it will be set to default value." + ) + + if con.has_section("latency_sata_ssd"): + items_latency_sata_ssd = dict(con.items("latency_sata_ssd")) + self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value( + items_latency_sata_ssd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"], + gt=0, ) - self.__sliding_window_type = get_sliding_window_type_enum( - sliding_window_type + self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value( + items_latency_sata_ssd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"], + gt=0, ) - self.__read__window_size(items_sliding_window) - self.__read__window_minimum_threshold(items_sliding_window) else: logging.warning( - "sliding_window section parameter not found, it will be set to default value." + "latency_sata_ssd section parameter not found, it will be set to default value." + ) + if con.has_section("latency_nvme_ssd"): + items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd")) + self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"], + gt=0, + ) + self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"], + gt=0, + ) + else: + logging.warning( + "latency_nvme_ssd section parameter not found, it will be set to default value." + ) + if con.has_section("latency_sata_hdd"): + items_latency_sata_hdd = dict(con.items("latency_sata_hdd")) + self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value( + items_latency_sata_hdd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"], + gt=0, + ) + self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value( + items_latency_sata_hdd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"], + gt=0, + ) + else: + logging.warning( + "latency_sata_hdd section parameter not found, it will be set to default value." ) self.__print_all_config_value() - def __repr__(self): - config_str = { - 'log.level': self.__log_level, - 'common.absolute_threshold': self.__absolute_threshold, - 'common.slow_io_detect_frequency': self.__slow_io_detect_frequency, - 'common.disk': self.__disks_to_detection, - 'common.stage': self.__stage, - 'common.iotype': self.__iotype, - 'algorithm.train_data_duration': self.__train_data_duration, - 'algorithm.train_update_duration': self.__train_update_duration, - 'algorithm.algorithm_type': self.__algorithm_type, - 'algorithm.boxplot_parameter': self.__boxplot_parameter, - 'algorithm.n_sigma_parameter': self.__n_sigma_parameter, - 'sliding_window.sliding_window_type': self.__sliding_window_type, - 'sliding_window.window_size': self.__window_size, - 'sliding_window.window_minimum_threshold': self.__window_minimum_threshold - } - return str(config_str) + def __repr__(self) -> str: + return str(self._conf) + + def __str__(self) -> str: + return str(self._conf) def __print_all_config_value(self): - logging.info(f"all config is follow:\n {self}") + logging.info("all config is follow:\n %s", self) + + def get_tot_lim(self, disk_type, io_type): + if io_type == "read": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("read_tot_lim", None) + elif io_type == "write": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("write_tot_lim", None) + else: + return None def get_train_data_duration_and_train_update_duration(self): - return self.__train_data_duration, self.__train_update_duration + return ( + self._conf["common"]["train_data_duration"], + self._conf["common"]["train_update_duration"], + ) def get_window_size_and_window_minimum_threshold(self): - return self.__window_size, self.__window_minimum_threshold + return ( + self._conf["sliding_window"]["window_size"], + self._conf["sliding_window"]["window_minimum_threshold"], + ) @property def slow_io_detect_frequency(self): - return self.__slow_io_detect_frequency + return self._conf["common"]["slow_io_detect_frequency"] @property def algorithm_type(self): - return self.__algorithm_type + return self._conf["algorithm"]["algorithm_type"] @property def sliding_window_type(self): - return self.__sliding_window_type + return self._conf["sliding_window"]["sliding_window_type"] @property def train_data_duration(self): - return self.__train_data_duration + return self._conf["common"]["train_data_duration"] @property def train_update_duration(self): - return self.__train_update_duration + return self._conf["common"]["train_update_duration"] @property def window_size(self): - return self.__window_size + return self._conf["sliding_window"]["window_size"] @property def window_minimum_threshold(self): - return self.__window_minimum_threshold + return self._conf["sliding_window"]["window_minimum_threshold"] @property def absolute_threshold(self): - return self.__absolute_threshold + return self._conf["common"]["absolute_threshold"] @property def log_level(self): - return self.__log_level + return self._conf["log"]["level"] @property def disks_to_detection(self): - return self.__disks_to_detection + return self._conf["common"]["disk"] @property def stage(self): - return self.__stage + return self._conf["common"]["stage"] @property def iotype(self): - return self.__iotype + return self._conf["common"]["iotype"] @property def boxplot_parameter(self): - return self.__boxplot_parameter + return self._conf["algorithm"]["boxplot_parameter"] @property def n_sigma_parameter(self): - return self.__n_sigma_parameter + return self._conf["algorithm"]["n_sigma_parameter"] diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/data_access.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/data_access.py index ed997e6..1bc5ed8 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/data_access.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/data_access.py @@ -16,6 +16,7 @@ from sentryCollector.collect_plugin import ( Result_Messages, get_io_data, is_iocollect_valid, + get_disk_type ) diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py index e710ddd..4ef7049 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/detector.py @@ -40,7 +40,7 @@ class Detector: metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name) if metric_value is None: logging.debug('not found metric value, so return None.') - return False, None, None + return False, None, None, None logging.debug(f'input metric value: {str(metric_value)}') self._threshold.push_latest_data_to_queue(metric_value) detection_result = self._slidingWindow.is_slow_io_event(metric_value) @@ -49,9 +49,9 @@ class Detector: return detection_result def __repr__(self): - return (f'disk_name: {self._metric_name.get_disk_name()}, stage_name: {self._metric_name.get_stage_name()},' - f' io_type_name: {self._metric_name.get_io_access_type_name()},' - f' metric_name: {self._metric_name.get_metric_name()}, threshold_type: {self._threshold},' + return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},' + f' io_type_name: {self._metric_name.io_access_type_name},' + f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},' f' sliding_window_type: {self._slidingWindow}') @@ -69,9 +69,9 @@ class DiskDetector: # todo:根因诊断 for detector in self._detector_list: result = detector.is_slow_io_event(io_data_dict_with_disk_name) - if result[0] and detector.get_metric_name().get_stage_name() == 'bio': - return result[0], detector.get_metric_name(), result[1], result[2] - return False, None, None, None + if result[0] and detector.get_metric_name().stage_name == 'bio': + return result[0], detector.get_metric_name(), result[1], result[2], result[3] + return False, None, None, None, None def __repr__(self): msg = f'disk: {self._disk_name}, ' diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/io_data.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/io_data.py index 0e17051..d341b55 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/io_data.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/io_data.py @@ -45,30 +45,10 @@ class IOData: time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) +@dataclass(frozen=True) class MetricName: - _disk_name: str = None - _stage_name: str = None - _io_access_type_name: str = None - _metric_name: str = None - - def __init__(self, disk_name: str, stage_name: str, io_access_type_name: str, metric_name: str): - self._disk_name = disk_name - self._stage_name = stage_name - self._io_access_type_name = io_access_type_name - self._metric_name = metric_name - - def get_disk_name(self): - return self._disk_name - - def get_stage_name(self): - return self._stage_name - - def get_io_access_type_name(self): - return self._io_access_type_name - - def get_metric_name(self): - return self._metric_name - - def __repr__(self): - return (f'disk: {self._disk_name}, stage: {self._stage_name}, io_access_type: {self._io_access_type_name},' - f'metric: {self._metric_name}') + disk_name: str + disk_type: str + stage_name: str + io_access_type_name: str + metric_name: str diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/sliding_window.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/sliding_window.py index 89191e5..800441b 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/sliding_window.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/sliding_window.py @@ -21,15 +21,11 @@ class SlidingWindowType(Enum): class SlidingWindow: - _ai_threshold = None - _queue_length = None - _queue_threshold = None - _io_data_queue: list = None - _io_data_queue_abnormal_tag: list = None - - def __init__(self, queue_length: int, threshold: int): + def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None): self._queue_length = queue_length self._queue_threshold = threshold + self._ai_threshold = None + self._abs_threshold = abs_threshold self._io_data_queue = [] self._io_data_queue_abnormal_tag = [] @@ -38,7 +34,15 @@ class SlidingWindow: self._io_data_queue.pop(0) self._io_data_queue_abnormal_tag.pop(0) self._io_data_queue.append(data) - self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold if self._ai_threshold is not None else False) + self._io_data_queue_abnormal_tag.append( + ( + data >= self._ai_threshold + or self._abs_threshold is not None + and data >= self._abs_threshold + ) + if self._ai_threshold is not None + else False + ) def update(self, threshold): if self._ai_threshold == threshold: @@ -49,7 +53,7 @@ class SlidingWindow: self._io_data_queue_abnormal_tag.append(data >= self._ai_threshold) def is_slow_io_event(self, data): - return False, None, None + return False, None, None, None def __repr__(self): return "[SlidingWindow]" @@ -59,10 +63,10 @@ class NotContinuousSlidingWindow(SlidingWindow): def is_slow_io_event(self, data): super().push(data) if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None: - return False, self._io_data_queue, self._ai_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold: - return True, self._io_data_queue, self._ai_threshold - return False, self._io_data_queue, self._ai_threshold + return True, self._io_data_queue, self._ai_threshold, self._abs_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold def __repr__(self): return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" @@ -72,16 +76,21 @@ class ContinuousSlidingWindow(SlidingWindow): def is_slow_io_event(self, data): super().push(data) if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None: - return False, self._io_data_queue, self._ai_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold consecutive_count = 0 for tag in self._io_data_queue_abnormal_tag: if tag: consecutive_count += 1 if consecutive_count >= self._queue_threshold: - return True, self._io_data_queue, self._ai_threshold + return ( + True, + self._io_data_queue, + self._ai_threshold, + self._abs_threshold, + ) else: consecutive_count = 0 - return False, self._io_data_queue, self._ai_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold def __repr__(self): return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" @@ -91,18 +100,20 @@ class MedianSlidingWindow(SlidingWindow): def is_slow_io_event(self, data): super().push(data) if len(self._io_data_queue) < self._queue_length or self._ai_threshold is None: - return False, self._io_data_queue, self._ai_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold median = np.median(self._io_data_queue) if median >= self._ai_threshold: - return True, self._io_data_queue, self._ai_threshold - return False, self._io_data_queue, self._ai_threshold + return True, self._io_data_queue, self._ai_threshold, self._abs_threshold + return False, self._io_data_queue, self._ai_threshold, self._abs_threshold def __repr__(self): return f"[MedianSlidingWindow, window size: {self._queue_length}]" class SlidingWindowFactory: - def get_sliding_window(self, sliding_window_type: SlidingWindowType, *args, **kwargs): + def get_sliding_window( + self, sliding_window_type: SlidingWindowType, *args, **kwargs + ): if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow: return NotContinuousSlidingWindow(*args, **kwargs) elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow: diff --git a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/utils.py b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/utils.py index 0ed37b9..d6f4067 100644 --- a/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/utils.py +++ b/sysSentry-1.0.2/src/python/sentryPlugins/ai_block_io/utils.py @@ -19,53 +19,57 @@ from .io_data import MetricName, IOData def get_threshold_type_enum(algorithm_type: str): - if algorithm_type.lower() == 'absolute': + if algorithm_type.lower() == "absolute": return ThresholdType.AbsoluteThreshold - if algorithm_type.lower() == 'boxplot': + if algorithm_type.lower() == "boxplot": return ThresholdType.BoxplotThreshold - if algorithm_type.lower() == 'n_sigma': + if algorithm_type.lower() == "n_sigma": return ThresholdType.NSigmaThreshold return None def get_sliding_window_type_enum(sliding_window_type: str): - if sliding_window_type.lower() == 'not_continuous': + if sliding_window_type.lower() == "not_continuous": return SlidingWindowType.NotContinuousSlidingWindow - if sliding_window_type.lower() == 'continuous': + if sliding_window_type.lower() == "continuous": return SlidingWindowType.ContinuousSlidingWindow - if sliding_window_type.lower() == 'median': + if sliding_window_type.lower() == "median": return SlidingWindowType.MedianSlidingWindow - logging.warning(f"the sliding window type: {sliding_window_type} you set is invalid, use default value: not_continuous") - return SlidingWindowType.NotContinuousSlidingWindow + return None -def get_metric_value_from_io_data_dict_by_metric_name(io_data_dict: dict, metric_name: MetricName): +def get_metric_value_from_io_data_dict_by_metric_name( + io_data_dict: dict, metric_name: MetricName +): try: - io_data: IOData = io_data_dict[metric_name.get_disk_name()] - io_stage_data = asdict(io_data)[metric_name.get_stage_name()] - base_data = io_stage_data[metric_name.get_io_access_type_name()] - metric_value = base_data[metric_name.get_metric_name()] + io_data: IOData = io_data_dict[metric_name.disk_name] + io_stage_data = asdict(io_data)[metric_name.stage_name] + base_data = io_stage_data[metric_name.io_access_type_name] + metric_value = base_data[metric_name.metric_name] return metric_value except KeyError: return None -def get_data_queue_size_and_update_size(training_data_duration: float, train_update_duration: float, - slow_io_detect_frequency: int): +def get_data_queue_size_and_update_size( + training_data_duration: float, + train_update_duration: float, + slow_io_detect_frequency: int, +): data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency) update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency) return data_queue_size, update_size def get_log_level(log_level: str): - if log_level.lower() == 'debug': + if log_level.lower() == "debug": return logging.DEBUG - elif log_level.lower() == 'info': + elif log_level.lower() == "info": return logging.INFO - elif log_level.lower() == 'warning': + elif log_level.lower() == "warning": return logging.WARNING - elif log_level.lower() == 'error': + elif log_level.lower() == "error": return logging.ERROR - elif log_level.lower() == 'critical': + elif log_level.lower() == "critical": return logging.CRITICAL return logging.INFO -- Gitee