7 Star 1 Fork 0

ls/ISCM

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
rolling_forecast.py 8.11 KB
一键复制 编辑 原始数据 按行查看 历史
Hush 提交于 2021-12-21 12:53 . 加权平均-时序模型
import pandas as pd
from statsmodels.tsa.ar_model import AutoReg as AR
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from statsmodels.tsa.api import ExponentialSmoothing
from statsmodels.tsa.api import SimpleExpSmoothing
# 计算误差
# 绝对值误差
def get_error(a, b):
error = abs(a - b)
return error
# 平方项误差
def get_error2(a, b):
error = (a - b) ** 2
return error
# 自回归(AR)模型,lag为模型阶数
def ar_predictor(data, target, lag=7):
days = len(data)
data_y = data['y'].to_list()
# 预测
model = AR(data_y, lag)
results = model.fit()
p = results.predict(days, days)[0]
p = p if p > 0 else 0
f = target['y'].to_list()[0]
e = get_error2(p, f)
return p, e
# Holt-Winters模型(优先使用乘法模型)
def holt_winters_predictor(data, target):
days = len(data)
data_y = data['y'].to_list()
try:
hw = ExponentialSmoothing(data_y, trend='add', seasonal='mul', seasonal_periods=7)
except ValueError:
hw = ExponentialSmoothing(data_y, trend='add', seasonal='add', seasonal_periods=7)
r = hw.fit()
p = r.predict(start=days, end=days)[0]
p = p if p > 0 else 0
f = target['y'].to_list()[0]
e = get_error2(p, f)
return p, e
# 滑动平均(MA)模型(t为滑动窗口长度,天)
def moving_average_predictor(data, target, t=7):
p = data[-t:]['y'].mean()
p = p if p > 0 else 0
f = target['y'].to_list()[0]
e = get_error2(p, f)
return p, e
# 简单指数平滑模型(自动参数寻优)
def simple_expsmoothing_predictor(data, target):
data_y = data['y'].to_list()
m = SimpleExpSmoothing(data_y)
results = m.fit()
p = results.forecast(1)[0]
p = p if p > 0 else 0
f = target['y'].to_list()[0]
e = get_error2(p, f)
return p, e
# 根据累计误差计算相应权重
def get_weight(e1, e2, e3, e4):
"""
e1, e2, e3, e4: 相应模型累计历史误差总和
权重计算的三种情况:
1,累计误差全部相等,权重均为0.25;
2,累计误差全部非0,以累积误差的倒数计算相应权重;
3,累计误差包含0,则累计误差为0的模型权重相等,剩余模型权重为0。
"""
e = [e1, e2, e3, e4]
w = [0] * 4
if e1 == e2 and e2 == e3 and e3 == e4:
w = [1/4] * 4
elif not e.__contains__(0):
w1 = (1 / e1) / (1 / e1 + 1 / e2 + 1 / e3 + 1 / e4)
w2 = (1 / e2) / (1 / e1 + 1 / e2 + 1 / e3 + 1 / e4)
w3 = (1 / e3) / (1 / e1 + 1 / e2 + 1 / e3 + 1 / e4)
w4 = (1 / e4) / (1 / e1 + 1 / e2 + 1 / e3 + 1 / e4)
w = [w1, w2, w3, w4]
elif e.count(0) == 1:
w[e.index(0)] = 1
elif e.count(0) == 2:
w[e.index(0)] = 1/2
w[e.index(0, e.index(0)+1)] = 1/2
else:
w = [1/3] * 4
w[e.index(sum(e))] = 0
return w
# 多模型加权平均组合主函数(只考虑四个单模型的有限历史误差总和,回溯期限为periods天)
def goods_run_1(gid, data, periods):
end = data['ds'].max()
num = len(data)
# 保证训练集不小于14天(Holt-Winters模型季节性周期为7,必须保证训练集长度超过7*2)
if num < 60:
p_date_list = pd.date_range(end=end, periods=num - 14)
else:
p_date_list = pd.date_range(end=end, periods=42)
# 记录periods天的模型历史误差
ee1, ee2, ee3, ee4 = [0] * periods, [0] * periods, [0] * periods, [0] * periods
res = []
for d in p_date_list:
# 判断是否为真实值(true_data为真实数据,如果d日期对应的真实销量不存在,则跳过,只考虑真实销量数据)
if not pd.isnull(data[data['ds'] == d].reset_index(drop=True).loc[0, 'true_data']):
gs_data_r = data[data['ds'] < d][['ds', 'y']]
target = data[data['ds'] == d][['ds', 'y']]
p1, e1 = ar_predictor(gs_data_r, target)
p2, e2 = holt_winters_predictor(gs_data_r, target)
p3, e3 = moving_average_predictor(gs_data_r, target)
p4, e4 = simple_expsmoothing_predictor(gs_data_r, target)
w1, w2, w3, w4 = get_weight(sum(ee1), sum(ee2), sum(ee3), sum(ee4))
p = p1 * w1 + p2 * w2 + p3 * w3 + p4 * w4
# 更新记录的历史权重值
ee1 = ee1[1:] + [e1]
ee2 = ee2[1:] + [e2]
ee3 = ee3[1:] + [e3]
ee4 = ee3[1:] + [e4]
res.append((gid, d, p, target['y'].to_list()[0], w1, w2, w3, w4, p1, p2, p3, p4, e1, e2, e3, e4))
return res
# 多模型加权平均组合主函数(考虑四个单模型的全历史误差总和)
def goods_run_2(gid, data):
end = data['ds'].max()
num = len(data)
if num < 60:
p_date_list = pd.date_range(end=end, periods=num - 14)
else:
p_date_list = pd.date_range(end=end, periods=42)
ee1, ee2, ee3, ee4 = 0, 0, 0, 0
res = []
for d in p_date_list:
if not pd.isnull(data[data['ds'] == d].reset_index(drop=True).loc[0, 'true_data']):
gs_data_r = data[data['ds'] < d][['ds', 'y']]
target = data[data['ds'] == d][['ds', 'y']]
p1, e1 = ar_predictor(gs_data_r, target)
p2, e2 = holt_winters_predictor(gs_data_r, target)
p3, e3 = moving_average_predictor(gs_data_r, target)
p4, e4 = simple_expsmoothing_predictor(gs_data_r, target)
w1, w2, w3, w4 = get_weight(ee1, ee2, ee3, ee4)
p = p1 * w1 + p2 * w2 + p3 * w3 + p4 * w4
ee1 += e1
ee2 += e2
ee3 += e3
ee4 += e4
res.append((gid, d, p, target['y'].to_list()[0], w1, w2, w3, w4, p1, p2, p3, p4, e1, e2, e3, e4))
return res
# 主要执行函数
def run(data, periods, file_name):
# 筛选有效sku
df = pd.read_excel(r'C:\Users\cd542\Desktop\wmape.xlsx')
goods_list = list(set(df['goodsid']) & set(data['goodsid']))
# goods_list = data['goodsid'].drop_duplicates().to_list()
# goods_list = goods_list[:2]
res = []
t1 = time.time()
# 利用多进程对每个商品进行趋势预测
with ProcessPoolExecutor(max_workers=7) as executor:
futures = {}
for gid in goods_list:
gs_data = data[data['goodsid'] == gid][['ds', 'y', 'true_data']].reset_index(drop=True)
# periods为None代表考虑单模型全历史预测误差
if periods is None:
job = executor.submit(goods_run_2, gid, gs_data)
else:
job = executor.submit(goods_run_1, gid, gs_data, periods)
futures[job] = gid
count = 0
for job in as_completed(futures):
gs_res = job.result()
gid = futures[job]
res.extend(gs_res)
count += 1
print('完成: {}, {}个'.format(gid, count))
print('总耗时:', time.time() - t1)
res_df = pd.DataFrame({
'goodsid': [i[0] for i in res],
'sdt': [i[1] for i in res],
'yhat': [i[2] for i in res],
'y': [i[3] for i in res],
'w1': [i[4] for i in res],
'w2': [i[5] for i in res],
'w3': [i[6] for i in res],
'w4': [i[7] for i in res],
'p1': [i[8] for i in res],
'p2': [i[9] for i in res],
'p3': [i[10] for i in res],
'p4': [i[11] for i in res],
'e1': [i[12] for i in res],
'e2': [i[13] for i in res],
'e3': [i[14] for i in res],
'e4': [i[15] for i in res]
})
res_df.to_excel(file_name, index=False)
# 主函数,数据集导入
def main():
data = pd.read_csv('9139_20211217.csv', encoding='gbk')
data = data.drop(data.iloc[:, 1:3], axis=1)
data.columns = ['goodsid', 'ds', 'y', 'true_data']
data['ds'] = pd.to_datetime(data['ds'], format='%Y/%m/%d')
# 依次执行考虑不同长度历史预测误差的情况
for i in [None, 3, 7, 10, 14]:
run(data, i, 'res_info_(' + str(i) + ')2.xlsx')
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ls55k/iscm.git
git@gitee.com:ls55k/iscm.git
ls55k
iscm
ISCM
master

搜索帮助