diff --git a/dbmind/common/algorithm/correlation.py b/dbmind/common/algorithm/correlation.py index a11b94ac6d928c08abfb9d043293cd4b8ee9dfdc..03442353395d7594834c0371b5c074168d14a188 100644 --- a/dbmind/common/algorithm/correlation.py +++ b/dbmind/common/algorithm/correlation.py @@ -11,111 +11,278 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. +from functools import partial + import numpy as np from scipy.stats import pearsonr, zscore -def dtw_distance(x, y, max_length=10): - """Compute Dynamic Time Warping (DTW) similarity measure between - time series and return it. - DTW is computed as the Euclidean distance between aligned time series, - i.e., if :math:`\\pi` is the optimal alignment path: +def pearson(x, y): + return pearsonr(x, y)[0] - .. math:: - DTW(X, Y) = \\sqrt{\\sum_{(i, j) \\in \\pi} \\|X_{i} - Y_{j}\\|^2} - Parameters - ---------- - x : array_like - y : array_like - max_length : int - Maximum warping path length. - ---------- - .. [1] H. Sakoe, S. Chiba, "Dynamic programming algorithm optimization for - spoken word recognition," IEEE Transactions on Acoustics, Speech and - Signal Processing, vol. 26(1), pp. 43--49, 1978. - """ - matrix = {} - length_delta = abs(len(x) - len(y)) - max_length = max_length if max_length > length_delta else length_delta - for i in range(-1, len(x)): - for j in range(-1, len(y)): - matrix[(i, j)] = float('inf') - matrix[(-1, -1)] = 0 - for i in range(len(x)): - for j in range(max(0, i - max_length), min(len(y), i + max_length)): - dist = (x[i] - y[j]) ** 2 - matrix[(i, j)] = dist + min(matrix[(i - 1, j)], matrix[(i, j - 1)], matrix[(i - 1, j - 1)]) - return np.sqrt(matrix[len(x) - 1, len(y) - 1]) +def iter_shift(arr, num, fill_value=0): + if num == 0: + return arr + if num > 0: + return np.concatenate((np.full(num, fill_value), arr[:-num])) + else: + return np.concatenate((arr[-num:], np.full(-num, fill_value))) + +def cross_correlation(data1, data2, shift_num): + return pearson(data1, iter_shift(data2, shift_num)) -def lb_keogh(x, y, radius=10): - """Compute LB_Keogh and return it. - LB_Keogh is to compute distance between the time series and the envelope of another time series. +class CorrelationAnalysis: + """A class to analyze the correlation between two time series, including correlation coefficient, + fluctuation direction and fluctuation order. Parameters ---------- - x : array_like - y : array_like - radius : int - Radius to be used for the envelope generation. - References + (A) preprocess_method ---- Algorithms for filtering major trends and preserving local fluctuations of a time series + 6 preprocessing methods to choose: + - diff: Differential methods to eliminate the impact of major trends + - diff_times: An integer setting the times of running the differential function + - holt_winters: Predicts values using three smoothing equations + - holt_winters_parameters: A length 4 tuple which looks like this: (holt_winters_alpha, holt_winters_beta, + holt_winters_gamma, holt_winters_period) + - historical_avg: Predicting values based on the average value of the historical data + - historical_med: Predicting values based on the median value of the historical data + - wavelet: Using wavelet decomposition to only keep the high-frequency part + - wavelet_window: The length of wavelet filter + - none: Directly returning the data after normalizing and feature amplifying + (B) analyze_method ---- Algorithms for doing correlation analysis of two time series + 2 analyzing methods to choose: + - pearson: Calculating Pearson correlation coefficient + - coflux: A way of computing cross correlation based on inner product + - sliding_length: Deciding the range of the sliding + Additionally, you may turn on the normalization_switch if you want to get a more accurate result when doing Pearson, + but it could be computationally expensive. On the other hand, if you decide not to turn on the normalization_switch, + the result could be distortion when the original data has a large range. Note that this swich only works when + choosing "pearson" as analyze_method, and normalization is mandatory for "coflux". + + Inputs + ---------- + x: An array-like time series + y: An array-like time series + + Examples + ---------- + my_correlation_analysis = CorrelationAnalysis(preprocess_method='diff', analyze_method='pearson', diff_times=1) + x, y = my_correlation_analysis.preprocess(x, y) + correlation_result = my_correlation_analysis.analyze(x, y) + + Returns ---------- - .. [1] Keogh, E. Exact indexing of dynamic time warping. In International - Conference on Very Large Data Bases, 2002. pp 406-417. + A tuple whose first element is correlation coefficient, and whose second element is the shift coefficient + representing the fluctuation order. + The closer the coefficient to 1 or -1, the stronger the correlation between x and y. + Positive correlation coefficient means they move in the same direction and vice versa. + Positive shift coefficient means x fluctuates later than y and vice versa. """ - lb_sum = 0 - for ind, i in enumerate(x): - lower_bound = min(y[(ind - radius if ind - radius >= 0 else 0):(ind + radius)]) - upper_bound = max(y[(ind - radius if ind - radius >= 0 else 0):(ind + radius)]) - if i > upper_bound: - lb_sum = lb_sum + (i - upper_bound) ** 2 - elif i < lower_bound: - lb_sum = lb_sum + (i - lower_bound) ** 2 - return np.sqrt(lb_sum) + def __init__(self, + preprocess_method='diff', + analyze_method='coflux', + normalization_switch=False, + diff_times=1, + wavelet_window=3, + holt_winters_parameters=(0.2, 0.2, 0.2, 7), + sliding_length=0): + self.preprocess_method = preprocess_method + self.analyze_method = analyze_method -def pearson(x, y): - return pearsonr(x, y)[0] + self.normalization_switch = normalization_switch + self.diff_times = diff_times + self.wavelet_window = wavelet_window -def amplify_feature(data): - alpha = 0.5 - beta = 10 - data_zscore = zscore(data) - res = [] - for x in list(data_zscore): - if x < 0: - res.append(-np.exp(min(abs(x), beta) * alpha) + 1) - else: - res.append(np.exp(min(x, beta) * alpha) - 1) - return res + (self.holt_winters_alpha, self.holt_winters_beta, self.holt_winters_gamma, + self.holt_winters_period) = holt_winters_parameters + self.sliding_length = int(abs(sliding_length)) + def _preprocess_execute(self, data, method): + if self.normalization_switch or self.analyze_method == 'coflux': + data = self.normalize(data) -def iter_shift(arr, num, fill_value=0): - if num == 0: - return arr - if num > 0: - return np.concatenate((np.full(num, fill_value), arr[:-num])) - else: - return np.concatenate((arr[-num:], np.full(-num, fill_value))) + method_map = {'diff': partial(self.diff, diff_times=self.diff_times), + 'holt_winters': partial(self.holt_winters, + alpha=self.holt_winters_alpha, + beta=self.holt_winters_beta, + gamma=self.holt_winters_gamma, + period=self.holt_winters_period), + 'historical_avg': self.historical_mean, + 'historical_med': self.historical_median, + 'wavelet': partial(self.wavelet, + window=self.wavelet_window), + 'none': lambda x: x} + return np.nan_to_num(method_map[method](data)) + @staticmethod + def diff(data, diff_times=1): + """Simply uses the value of last day + or last week separately to predict the current one + ------------------------------------------------------------------- + :param diff_times: the times doing differential + :param data: original data + :return: flux time series + """ + return np.diff(data, diff_times) -def cross_correlation(data1, data2, shift_num): - return pearson(data1, iter_shift(data2, shift_num)) + @staticmethod + def holt_winters(data, alpha=0.2, beta=0.2, gamma=0.2, period=7): + """Calculates forecast values using three smoothing equations (level, + trend and seasonal components) with three parameters ranged from + 0 to 1 + ------------------------------------------------------------------- + :param data: original data + :param period: flux period + :param alpha: 0.2, 0.4, 0.6, 0.8 + :param beta: 0.2, 0.4, 0.6, 0.8 + :param gamma: 0.2, 0.4, 0.6, 0.8 + :return: flux time series + """ + s, t, p = list(), list(), list() + s_last = data[0] + t_last = data[0] + x_prediction = list() + for i, x in enumerate(data): + p_last = 0 if i - period < 0 else p[i - period] + s_new = alpha * (x - p_last) + (1 - alpha) * (s_last + t_last) + t_new = beta * (s_new - s_last) + (1 - beta) * t_last + p_new = gamma * (x - s_new) + (1 - gamma) * p_last + s.append(s_new) + t.append(t_new) + p.append(p_new) + x_prediction.append(s_new + p_new) + s_last = s_new + t_last = t_new + return data - np.array(x_prediction) + + @staticmethod + def historical_mean(data): + """Uses the average value of historical data to predict the current + one + ------------------------------------------------------------------- + :param data: original data + :return: flux time series + """ + return data - np.mean(data) + + @staticmethod + def historical_median(data): + """Uses the median value of historical data to predict the current + one + ------------------------------------------------------------------- + :param data: original data + :return: flux time series + """ + return data - np.median(data) + + @staticmethod + def wavelet(data, window=3): + """Wavelet decomposition can cover the entire frequency domain of time + series, and we set the high-frequency part as predictions + ------------------------------------------------------------------- + :param data: original data + :param window: length of wavelet filter + :return: flux time series + """ + data_wavelet = np.convolve(data, data[:window], mode='valid') + origin = len(data) + diff = window - 1 + return data[diff // 2: origin - diff + diff // 2] - data_wavelet + + def _analyze_execute(self, x, y, method): + method_map = {'pearson': self._pearson_correlation_analysis, + 'coflux': self._coflux_correlation_analysis} + return method_map[method](x, y) + + def _inner_product(self, x, y): + """Compute the inner product of two time series + + parameters + ---------- + x : array_like time series + y : array_like time series + """ + return np.inner(x, y) * 2 - x[0] * y[0] - x[-1] * y[-1] + + def _coflux_correlation_analysis(self, x, y): + """Compute Cross-correlation based on CoFlux and return a tuple representing + the correlation analysis of the 2 given time series. + + Parameters + ---------- + x : array_like time series + y : array_like time series + + References + ---------- + .. [1] Y. Su et al., CoFlux: Robustly Correlating KPIs by Fluctuations for + Service Troubleshooting, 2019 IEEE/ACM 27th International Symposium + on Quality of Service (IWQoS), 2019, pp. 1-10. + """ + data_length = len(x) + inner_product_xx = self._inner_product(x, x) + inner_product_yy = self._inner_product(y, y) + max_correlation_coefficient = self._inner_product(x, y) / ( + (inner_product_xx * inner_product_yy) ** 0.5) + + self.sliding_length = min(self.sliding_length, data_length - 1) + + shift = 0 + for s in range(-self.sliding_length, self.sliding_length + 1): + x_slide = iter_shift(x, s, 0) + correlation_coefficient = self._inner_product(x_slide, y) / ( + (inner_product_xx * inner_product_yy) ** 0.5) + if abs(correlation_coefficient) > abs(max_correlation_coefficient): + max_correlation_coefficient = correlation_coefficient + shift = s + return (max_correlation_coefficient, shift) + + def _pearson_correlation_analysis(self, x, y): + x, y = np.nan_to_num(x), np.nan_to_num(y) + if np.max(x) == np.min(x) or np.max(y) == np.min(y): + return 0, 0 + + left, right = min(self.sliding_length, len(y)), min(self.sliding_length, len(y)) + max_correlation, final_shift = 0, 0 + for shift in range(-left, right + 1): + correlation = cross_correlation(x, y, shift) + if abs(correlation) > abs(max_correlation): + max_correlation = correlation + final_shift = shift + return (max_correlation, final_shift) + + @staticmethod + def normalize(data): + """Normalizes the data using z-score and then amplifies the feature + ------------------------------------------------------------------- + :param data: original data + :return: preprocessed data + """ + data = zscore(data) + alpha = 0.5 + beta = 10 + for index, value in enumerate(data): + temp = np.exp(min(abs(value), beta) * alpha) - 1 + data[index] = temp if value >= 0 else -temp + return data + + def preprocess(self, x, y): + x = np.asarray(x) + y = np.asarray(y) + if np.all(x == x[0]) or np.all(y == y[0]): + return x, y + + return (self._preprocess_execute(x, self.preprocess_method), + self._preprocess_execute(y, self.preprocess_method)) + def analyze(self, x, y): + if np.all(x == x[0]) or np.all(y == y[0]): + return (0, 0) -def max_cross_correlation(data1, data2, left=0, right=0): - data1, data2 = np.nan_to_num(data1), np.nan_to_num(data2) - if np.max(data1) == np.min(data1) or np.max(data2) == np.min(data2): - return 0, 0 + return self._analyze_execute(x, y, self.analyze_method) - left, right = min(left, len(data2)), min(right, len(data2)) - max_correlation, final_shift = 0, 0 - for shift in range(-left, right + 1): - correlation = cross_correlation(data1, data2, shift) - if abs(correlation) > abs(max_correlation): - max_correlation = correlation - final_shift = shift - return max_correlation, final_shift diff --git a/dbmind/components/anomaly_analysis.py b/dbmind/components/anomaly_analysis.py index a290cb1433109a24a7d021340aaa0c7395600474..37addb900db8e6e2279c12299035ba3e1fa45d7e 100644 --- a/dbmind/components/anomaly_analysis.py +++ b/dbmind/components/anomaly_analysis.py @@ -25,18 +25,17 @@ from datetime import datetime from scipy.interpolate import interp1d from dbmind import global_vars -from dbmind.common.algorithm.correlation import max_cross_correlation +from dbmind.cmd.edbmind import init_global_configs +from dbmind.common.algorithm.correlation import CorrelationAnalysis from dbmind.common.tsdb import TsdbClientFactory from dbmind.common.utils.checking import ( check_ip_valid, check_port_valid, date_type, path_type ) from dbmind.common.utils.cli import write_to_terminal -from dbmind.cmd.edbmind import init_global_configs from dbmind.common.utils.component import initialize_tsdb_param +from dbmind.constants import DISTINGUISHING_INSTANCE_LABEL from dbmind.service import dai from dbmind.service.utils import SequenceUtils -from dbmind.constants import DISTINGUISHING_INSTANCE_LABEL - LEAST_WINDOW = int(7.2e3) * 1000 LOOK_BACK = 0 @@ -111,7 +110,10 @@ def get_correlations(arg): fill_value=(sequence.values[0], sequence.values[-1]) ) y = f(this_sequence.timestamps) - corr, delay = max_cross_correlation(this_sequence.values, y, LOOK_BACK, LOOK_FORWARD) + correlation_calculation = CorrelationAnalysis(preprocess_method='diff', + analyze_method='pearson') + x, y = correlation_calculation.preprocess(this_sequence.values, y) + corr, delay = correlation_calculation.analyze(x, y) return name, corr, delay, sequence.values, sequence.timestamps @@ -138,7 +140,7 @@ def multi_process_correlation_calculation(metric, sequence_args, corr_threshold= for name in correlation_results: correlation_results[name].sort(key=lambda item: item[1], reverse=True) - del(correlation_results[name][topk:]) + del (correlation_results[name][topk:]) return correlation_results @@ -195,7 +197,7 @@ def main(argv): init_global_configs(args.conf) if not initialize_tsdb_param(): parser.exit(1, "TSDB service does not exist, exiting...") - + client = TsdbClientFactory.get_tsdb_client() all_metrics = client.all_metrics actual_start_time = min(start_time, end_time - LEAST_WINDOW) @@ -222,7 +224,8 @@ def main(argv): csv_path = os.path.join(args.csv_dump_path, new_name + ".csv") with open(csv_path, 'w+', newline='') as f: writer = csv.writer(f) - for _, name, corr, delay, values, timestamps in sorted(result[this_name].values(), key=lambda t: (t[3], -t[0])): + for _, name, corr, delay, values, timestamps in sorted(result[this_name].values(), + key=lambda t: (-abs(t[2]), t[3])): writer.writerow((name, corr, delay) + values) # Discard the first element abs(corr) after sorting. diff --git a/tests/test_correlation_analysis.py b/tests/test_correlation_analysis.py new file mode 100644 index 0000000000000000000000000000000000000000..4993ec5dc70e0888d15c945ca7a3e4d4cd2f4d86 --- /dev/null +++ b/tests/test_correlation_analysis.py @@ -0,0 +1,63 @@ +import numpy as np + +from dbmind.common.algorithm.correlation import CorrelationAnalysis + + +def test_correlation_analysis(): + period = 20 + mean1 = 5000 + std1 = 40 + mean2 = 50000 + std2 = 3000 + start = 47000 + gradient = 10 + constant = 35000 + multiple = 0.03 + data_size = 300 + data_start = 500 + x = np.arange(data_start, data_start + data_size, 1) + random = np.random.random(data_size) + noise1 = np.random.normal(mean1, std1, data_size) + noise2 = np.random.normal(mean2, std2, data_size) + linear = start + gradient * x + condition = np.array([0 if i % period else 1 for i in range(data_start, data_start + data_size)]) + y1 = linear + multiple * random - noise1 * condition + y2 = constant + multiple * random + noise2 * condition + + preprocess_method_list = ['diff', 'historical_avg', 'historical_med', 'none'] + + for p in preprocess_method_list: + my_correlation_analysis = CorrelationAnalysis(preprocess_method=p, + analyze_method='coflux') + y1_preprocessed, y2_preprocessed = my_correlation_analysis.preprocess(y1, y2) + correlation_analysis_result = my_correlation_analysis.analyze(y1_preprocessed, y2_preprocessed) + assert 0.7 < abs(correlation_analysis_result[0]) < 1 + + holt_winters_smoothing_parameter_list = [0.2, 0.4, 0.6] + holt_winters_period_list = [2, 4, 6, 8, 10] + + for a in holt_winters_smoothing_parameter_list: + for b in holt_winters_smoothing_parameter_list: + for c in holt_winters_smoothing_parameter_list: + for p in holt_winters_period_list: + my_correlation_analysis = CorrelationAnalysis(preprocess_method='holt_winters', + analyze_method='pearson', + normalization_switch=True, + holt_winters_parameters=(a, b, c, p)) + y1_preprocessed, y2_preprocessed = my_correlation_analysis.preprocess(y1, y2) + correlation_analysis_result = my_correlation_analysis.analyze(y1_preprocessed, y2_preprocessed) + assert 0.7 < abs(correlation_analysis_result[0]) < 1 + + wavelet_window_list = [1, 2, 3] + sliding_length_list = [0, 200, 400] + + for w in wavelet_window_list: + for s in sliding_length_list: + my_correlation_analysis = CorrelationAnalysis(preprocess_method='wavelet', + analyze_method='coflux', + wavelet_window=w, + sliding_length=s) + y1_preprocessed, y2_preprocessed = my_correlation_analysis.preprocess(y1, y2) + correlation_analysis_result = my_correlation_analysis.analyze(y1_preprocessed, y2_preprocessed) + assert 0.7 < abs(correlation_analysis_result[0]) < 1 +