验证中...
mutual_information.py
Raw Copy
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 8 14:32:52 2018
@author: luolei
互信息法相关性计算
"""
import matplotlib.pyplot as plt
from scipy.ndimage.interpolation import shift
import numpy as np
import pandas as pd
import json
import sys
sys.path.append('../')
from mods.config_loader import config
from mods.data_filtering import savitzky_golay_filter, stl_detrend
def load_data_and_detrend():
"""载入数据并去趋势化"""
data = pd.read_csv('../tmp/total_implemented_normalized_data.csv')
sg_filtered_data = savitzky_golay_filter(data)
data_detrend, _ = stl_detrend(sg_filtered_data.copy(), save = True, show = False)
return sg_filtered_data
def data_binning(data, col, **kwargs):
"""数据变量分箱"""
data = data.copy()
freq_nums, intervals = np.histogram(data[col], **kwargs)
labels = intervals[1:]
return freq_nums, labels
def joint_freq_nums(data, col_a, col_b, **kwargs):
"""联合熵"""
data_cols = np.array(data.copy()[[col_a, col_b]])
series_a, series_b = data_cols[:, 0], data_cols[:, 1]
H, x_edges, y_edges = np.histogram2d(
series_a,
series_b,
bins = [kwargs['bins'], kwargs['bins']],
range = [kwargs['range'], kwargs['range']])
x_edges, y_edges = x_edges[1:], y_edges[1:]
return H, x_edges, y_edges
def probability(freq_nums):
freq_sum = np.sum(freq_nums)
probs = freq_nums.copy() / freq_sum
return probs
def univariate_entropy(freq_nums):
"""单变量熵"""
eps = 1e-6
probs = probability(freq_nums.copy())
log_probs = np.log(probs + eps)
entropy = - np.dot(probs, log_probs)
return entropy
def joint_entropy(H):
"""联合分布熵"""
eps = 1e-6
probs = probability(H.copy())
log_probs = np.log(probs + eps)
ie = - np.sum(np.multiply(probs, log_probs))
return ie
def info_entropy(data, col_a, col_b, **kwargs):
"""计算信息熵"""
freq_nums_a, labels_a = data_binning(data, col_a, **kwargs)
freq_nums_b, labels_b = data_binning(data, col_b, **kwargs)
H, x_edges, y_edges = joint_freq_nums(data, col_a, col_b, **kwargs)
# 计算信息熵
ie_a, ie_b = univariate_entropy(freq_nums_a), univariate_entropy(freq_nums_b)
ie_ab = joint_entropy(H)
ie = ie_a + ie_b - ie_ab
return ie
def info_entropy_test(data, col_a, col_b, max_lag, **kwargs):
"""互信息法相关性测试"""
test_results = {}
for lag in range(-max_lag, max_lag + 1):
data_lagged = data.copy()
if col_a == col_b:
data_lagged.rename(columns = {col_a: col_a + '_x'}, inplace = True)
data_lagged[col_a + '_y'] = data_lagged[col_a + '_x']
col_a += '_x'
col_b += '_y'
data_lagged[col_b] = shift(data_lagged[col_b], lag)
ie = info_entropy(data_lagged, col_a, col_b, **kwargs)
col_a, col_b = col_a[: -2], col_b[: -2]
else:
data_lagged[col_b] = shift(data_lagged[col_b], lag)
ie = info_entropy(data_lagged, col_a, col_b, **kwargs)
test_results[lag] = ie
return test_results
if __name__ == '__main__':
# 参数
columns = config.conf['model_params']['selected_columns']
target_columns = config.conf['model_params']['target_columns']
default_bins = 200
default_range = [-1, 1]
max_lag = 1000
# 载入数据,滤波,去趋势化
data_detrend = load_data_and_detrend()
# 信息熵检验
plt.figure(figsize = [10, 12])
total_time_delay_ie = {}
for col_x in columns:
total_time_delay_ie[col_x] = {}
for col_y in target_columns:
print('processing col_x: {}, col_y: {}'.format(col_x, col_y))
time_delay_ie = info_entropy_test(data_detrend, col_x, col_y, max_lag = max_lag, bins = default_bins, range = default_range)
total_time_delay_ie[col_x][col_y] = time_delay_ie
plt.subplot(
len(columns), len(target_columns), len(target_columns) * columns.index(col_x) + target_columns.index(col_y) + 1)
plt.plot(time_delay_ie.keys(), np.abs(list(time_delay_ie.values())))
plt.fill_between(time_delay_ie.keys(), np.abs(list(time_delay_ie.values())))
plt.plot([-max_lag, max_lag], [0, 0], 'k--', linewidth = 0.3)
plt.xlim([-max_lag, max_lag])
max_ie = max(time_delay_ie.values())
if max_ie >= 1.0:
plt.ylim([0.0, max_ie // 1 + 1])
elif 0.5 <= max_ie < 1.0:
plt.ylim([0.0, 1.0])
elif 0.2 <= max_ie < 0.5:
plt.ylim([0.0, 0.5])
elif 0.0 <= max_ie < 0.2:
plt.ylim([0.0, 0.2])
plt.xticks(fontsize = 4)
plt.yticks(fontsize = 4)
if col_x == columns[0]:
plt.title(col_y, fontsize = 5)
if col_y == target_columns[0]:
plt.ylabel(col_x, fontsize = 5)
plt.tight_layout()
plt.show()
plt.pause(1.0)
plt.savefig('../graphs/ie_analysis_on_continuous.png', dpi = 600)
with open('../tmp/ie_time_delay_correlation_results.json', 'w') as f:
json.dump(total_time_delay_ie, f)
config_loader.py
Raw Copy
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 8 14:32:52 2018
@author: lichen
载入配置
"""
import os
import yaml
import sys
import logging
import logging.config
import lake
import lake.decorator
import lake.data
import lake.dir
sys.path.append(os.path.join(os.path.dirname(__file__), '../config/'))
if len(logging.getLogger().handlers) == 0:
logging.basicConfig(level = logging.DEBUG)
@lake.decorator.singleton
class ConfigLoader(object):
def __init__(self, config_path=None):
self._config_path = config_path or self._absolute_path('../config/config.yml')
self._load()
def _absolute_path(self, path):
return os.path.join(os.path.dirname(__file__), path)
def _load(self):
with open(self._config_path, 'r') as f:
self._conf = yaml.load(f, Loader = yaml.Loader) # yaml.FullLoader
@property
def conf(self):
return self._conf
def set_logging(self):
"""
配制logging文件
"""
log_dir = self._absolute_path('../logs/')
lake.dir.mk(log_dir)
log_config = self.conf['logging']
update_filename(log_config)
logging.config.dictConfig(log_config)
def update_filename(log_config):
"""
更新logging中filename的配置
:param log_config: dict, 日志配置
"""
to_log_path = lambda x: os.path.abspath(os.path.join(os.path.dirname(__file__), '../', x))
if 'filename' in log_config:
log_config['filename'] = to_log_path(log_config['filename'])
for key, value in log_config.items():
if isinstance(value, dict):
update_filename(value)
config = ConfigLoader()
data_filter.py
Raw Copy
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 8 14:32:52 2018
@author: luolei
数据滤波
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import copy
from math import factorial
import statsmodels.api as sm
import sys
sys.path.append('../')
from mods.config_loader import config
def savitzky_golay(y, window_size, order, deriv = 0, rate = 1):
"""
savitzky_golay滤波
"""
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except Exception:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order + 1)
half_window = (window_size - 1) // 2
# precompute coefficients
b = np.mat([[k ** i for i in order_range] for k in range(-half_window, half_window + 1)])
m = np.linalg.pinv(b).A[deriv] * rate ** deriv * factorial(deriv)
# pad the signal at the extremes with values taken from the signal itself
firstvals = y[0] - np.abs(y[1:half_window + 1][::-1] - y[0])
lastvals = y[-1] + np.abs(y[-half_window - 1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve(m[::-1], y, mode = 'valid')
def savitzky_golay_filter(data, window_size = 11, order = 2, save = False):
"""
使用savitzky-golay滤波
:param data: pd.DataFrame, 带滤波数据表, columns = [target_column, selected_columns_0, selected_column_1, ...]
:param window_size: int, 选定的相邻窗口长度
:param order: int, 用于滤波的多项式的阶数
:return:
data_filtered: pd.DataFrame, 滤波处理后的数据表, columns = [target_column, selected_columns_0, selected_column_1, ...]
"""
data_copy = copy.deepcopy(data)
columns = list(set(config.conf['model_params']['target_columns'] + config.conf['model_params']['continuous_columns']))
data_copy = data_copy[columns]
filtered_results = []
for column in columns:
y = np.array(data_copy.loc[:, column]).flatten()
y_filtered = savitzky_golay(y, window_size = window_size, order = order)
filtered_results.append(y_filtered)
filtered_results = np.array(filtered_results).T
filtered_results = pd.DataFrame(filtered_results, columns = columns)
data_filtered = pd.concat(
[data[['city', 'ptime', 'time_stamp']], filtered_results], axis = 1, sort = False).reset_index(drop = True)
data_filtered = pd.concat([data_filtered, data[config.conf['model_params']['discrete_columns']]], axis = 1, sort = False)
if save:
data_filtered.to_csv('../tmp/total_sg_filtered_data.csv', index = False)
return data_filtered
def stl_detrend(data, save = False, show = False):
"""使用STL算法消除数据中的长期趋势,模型只对短期趋势进行预测"""
target_columns = config.conf['model_params']['target_columns']
continuous_columns = config.conf['model_params']['continuous_columns']
stl_freq = config.conf['model_params']['stl_freq']
data_detrend, data_trend = data.copy(), data.copy()
columns = list(set(target_columns + continuous_columns))
for column in columns:
time_series = data.copy()[column]
decomposition = sm.tsa.seasonal_decompose(
time_series.values, freq = stl_freq, two_sided = False, extrapolate_trend = 'freq') # 以stl_freq为周期
detrend_series = pd.Series(decomposition.seasonal + decomposition.resid)
trend_series = pd.Series(decomposition.trend)
data_detrend[column] = detrend_series
data_trend[column] = trend_series
if show:
decomposition.plot()
plt.title('{} decomposition'.format(column))
data_detrend.fillna(0.0, inplace = True)
data_trend.fillna(0.0, inplace = True)
if save:
data_detrend.to_csv('../tmp/total_stl_detrend_data.csv', index = False)
data_trend.to_csv('../tmp/total_stl_trend_data.csv', index = False)
return data_detrend, data_trend
if __name__ == '__main__':
# 载入数据
data = pd.read_csv('../tmp/total_implemented_normalized_data.csv')
# Savitzky-Golay滤波
sg_filtered_data = savitzky_golay_filter(data)
# 消除趋势项
data_detrend, data_trend = stl_detrend(data, save = True, show = True)

Comment list( 2 )

5183616_ulti-dreisteine
Dreisteine 2019-08-19 10:44

无需区分数值型变量或类别型变量

5183616_ulti-dreisteine
Dreisteine 2019-08-19 10:42

代码文件名需要替换

You need to Sign in for post a comment

Help Search