1 Star 2 Fork 1

张墨轩 / sustecher_org

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
datafetch.py 12.09 KB
一键复制 编辑 原始数据 按行查看 历史
张墨轩 提交于 2021-09-06 17:17 . update
# -*- coding: utf-8 -*-
"""
阿尔法研究平台
Project: sustecher
Author: Moses
E-mail: 8342537@qq.com
"""
import os
import numpy as np
import pandas as pd
import tushare as ts
from retrying import retry
from dataservice import DataService
try:
basestring
except NameError:
basestring = str
#打印能完整显示
#pd.set_option('display.max_columns', None)
#pd.set_option('display.max_rows', None)
#pd.set_option('display.width', 50000)
#pd.set_option('max_colwidth', 1000)
class DataFetcher(DataService):
@retry(stop_max_attempt_number=500, wait_random_min=1000, wait_random_max=2000)
def ensure_data(self, func, save_dir, start_dt='20090101', end_dt='20210804'):
""" 确保按交易日获取数据
"""
tmp_dir = os.path.join(self.root, "stock", save_dir)
dl = [pd.to_datetime(name.split(".")[0]) for name in os.listdir(tmp_dir)]
dl = sorted(dl)
s = pd.to_datetime(start_dt)
e = pd.to_datetime(end_dt)
tdays = pd.Series(self.tradedays, index=self.tradedays)
tdays = tdays[(tdays>=s)&(tdays<=e)]
tdays = tdays.index.tolist()
for tday in tdays:
if tday in dl: continue
t = tday.strftime("%Y%m%d")
datdf = func(t)
path = os.path.join(tmp_dir, t+".csv")
datdf.to_csv(path, encoding='gbk')
print(t+".csv write ok !!!!!")
def create_indicator(self, raw_data_dir, raw_data_field, indicator_name):
''' 主要用于通过日频数据创建日频指标
'''
tmp_dir = os.path.join(self.root, "stock", raw_data_dir)
tdays = [pd.to_datetime(f.split(".")[0]) for f in os.listdir(tmp_dir)]
tdays = sorted(tdays)
all_stocks_info = self.meta
df = pd.DataFrame(index=all_stocks_info.index, columns=tdays)
for f in os.listdir(tmp_dir):
tday = pd.to_datetime(f.split(".")[0])
dat = pd.read_csv(os.path.join(tmp_dir, f), index_col=['ts_code'], engine='python', encoding='gbk')
df[tday] = dat[raw_data_field]
print(tday)
df = df.dropna(how='all') #删掉全为空的一行
diff = df.index.difference(all_stocks_info.index) #删除没在股票基础列表中多余的股票行
df = df.drop(labels=diff)
self.close_file(df, indicator_name)
def _align_element(self, df1, df2):
''' 对齐股票和时间
'''
row_index = sorted(df1.index.intersection(df2.index))
col_index = sorted(df1.columns.intersection(df2.columns))
return df1.loc[row_index, col_index], df2.loc[row_index, col_index]
def preprocess(self, datdf, suspend_days_process=False, val=np.nan):
''' 数据预处理
'''
datdf = datdf.copy()
datdf = datdf.fillna(method='ffill', axis=1).fillna(method='bfill', axis=1)
row_index, col_index = datdf.index, datdf.columns
liststatus = self.listday_matrix.loc[row_index, col_index]
cond = (liststatus==1)
datdf = datdf.where(cond) #将不是上市日的数值替换为nan
if suspend_days_process:
tradestatus = self.trade_status.loc[row_index, col_index]
cond = (liststatus==1) & (tradestatus==0)
datdf = datdf.where(~cond, val) #将上市但停牌的数值设为指定值
return datdf
class TushareFetcher(DataFetcher):
def __init__(self):
self.pro = ts.pro_api('70ebdf9cf1d553d72c74eca9e9f3bc8ad7addea54be96eba559a71f3')
super().__init__()
def fetch_meta_data(self):
""" 股票基础信息
"""
df_list = []
df = self.pro.stock_basic(exchange='', fields='ts_code,name,list_date,delist_date')
df_list.append(df)
df = self.pro.stock_basic(exchange='', fields='ts_code,name,list_date,delist_date', list_status='D')
df_list.append(df)
df = self.pro.stock_basic(exchange='', fields='ts_code,name,list_date,delist_date', list_status='P')
df_list.append(df)
df = pd.concat(df_list)
df = df.rename(columns={"list_date":"ipo_date"})
df = df.rename(columns={'name':'sec_name'})
df = df.rename(columns={"ts_code":"code"})
df.drop_duplicates(subset=['code'], keep='first', inplace=True)
df.sort_values(by=['ipo_date'], inplace=True)
#print(pd.to_datetime(df['ipo_date']))
#df.reset_index(drop=True, inplace=True)
df.set_index(['code'], inplace=True)
self.close_file(df, 'meta')
def fetch_trade_day(self):
""" 交易日列表
"""
df = self.pro.trade_cal(is_open='1')
df = df[['cal_date','is_open']]
df = df.rename(columns={"cal_date":"tradedays"})
df.set_index(['tradedays'], inplace=True)
self.close_file(df, 'tradedays')
#------------------------------------------------------------------------------------
#日数据
def daily(self, t):
return self.pro.daily(trade_date=t)
def suspend_d(self, t):
return self.pro.suspend_d(trade_date=t)
def adj_factor(self, t):
return self.pro.adj_factor(trade_date=t)
#------------------------------------------------------------------------------------
#指数日行情
def index_daily(self):
index_list = ['000001.SH', '000300.SH', '000905.SH']
tmp_dir = os.path.join(self.root, "stock", "__temp_index_daily__")
for i in index_list:
df = self.pro.index_daily(ts_code=i)
path = os.path.join(tmp_dir, i+".csv")
df.to_csv(path, encoding='gbk')
print(i+".csv write ok !!!!!")
#------------------------------------------------------------------------------------
#创建各种指标
def create_listday_matrix(self):
''' 股票上市存续周期日矩阵
'''
all_stocks_info = self.meta
trade_days = self.tradedays
def if_listed(series):
nonlocal all_stocks_info
code = series.name
ipo_date = all_stocks_info.at[code, 'ipo_date']
delist_date = all_stocks_info.at[code, 'delist_date']
daterange = series.index
if delist_date is pd.NaT:
res = np.where(daterange >= ipo_date, 1, 0)
else:
res = np.where(daterange < ipo_date, 0, np.where(daterange <= delist_date, 1, 0))
return pd.Series(res, index=series.index)
listday_dat = pd.DataFrame(index=all_stocks_info.index, columns=trade_days)
listday_dat = listday_dat.apply(if_listed, axis=1)
self.close_file(listday_dat, 'listday_matrix')
def create_trade_status(self):
''' 股票停复牌状态
'''
tmp_dir = os.path.join(self.root, "stock", "__temp_suspend_d__")
tdays = [pd.to_datetime(f.split(".")[0]) for f in os.listdir(tmp_dir)]
tdays = sorted(tdays)
all_stocks_info = self.meta
df = pd.DataFrame(index=all_stocks_info.index, columns=tdays)
df.loc[:, :] = 1 #默认都是正常状态
for f in os.listdir(tmp_dir):
tday = pd.to_datetime(f.split(".")[0])
dat = pd.read_csv(os.path.join(tmp_dir, f), index_col=['ts_code'], engine='python', encoding='gbk')
df.loc[dat.index, tday] = 0 #停牌的设置为0
print(tday)
self.close_file(df, "trade_status")
def create_daily_quote_indicators(self):
'''
'''
#-------------------------------------------------------------
#创建一些行情指标
self.create_indicator("__temp_daily__", "open", "open")
open = self.preprocess(self.open)
self.close_file(open, 'open')
self.create_indicator("__temp_daily__", "close", "close")
close = self.preprocess(self.close)
self.close_file(close, 'close')
self.create_indicator("__temp_daily__", "high", "high")
high = self.preprocess(self.high)
self.close_file(high, 'high')
self.create_indicator("__temp_daily__", "low", "low")
low = self.preprocess(self.low)
self.close_file(low, 'low')
self.create_indicator("__temp_daily__", "vol", "vol")
vol = self.preprocess(self.vol)
self.close_file(vol, 'vol')
self.create_indicator("__temp_daily__", "amount", "amount")
amount = self.preprocess(self.amount)
self.close_file(amount, 'amount')
self.create_indicator("__temp_adj_factor__", "adj_factor", "adjfactor")
adjfactor = self.preprocess(self.adjfactor)
self.close_file(adjfactor, 'adjfactor')
close, adjfactor = self._align_element(self.close, self.adjfactor)
hfq_close = close * adjfactor
self.close_file(hfq_close, 'hfq_close') #后复权收盘价
def create_index_quote_indicators(self, indicator_name, start_dt='20090101', end_dt='20210804'):
'''
'''
benchmarks = ['000001.SH', '000300.SH', '000905.SH'] #上证综指,沪深300,中证500
s = pd.to_datetime(start_dt)
e = pd.to_datetime(end_dt)
tdays = pd.Series(self.tradedays, index=self.tradedays)
tdays = tdays[(tdays>=s)&(tdays<=e)]
tdays = tdays.index.tolist()
df = pd.DataFrame(index=benchmarks, columns=tdays)
tmp_dir = os.path.join(self.root, "stock", "__temp_index_daily__")
for bm in benchmarks:
dat = pd.read_csv(os.path.join(tmp_dir, bm+".csv"), index_col=['trade_date'], engine='python', encoding='gbk', parse_dates=['trade_date'])
df.loc[bm] = dat[indicator_name][df.columns]
self.close_file(df, 'bm_'+indicator_name)
def create_index_members(self, index_name, indicator_name, start_dt='20090101', end_dt='20210804'):
'''
'''
tmp_dir = os.path.join(self.root, "stock", "__temp_index_members__")
df_im = pd.read_parquet(os.path.join(tmp_dir, 'AINDEXMEMBERS.parquet'), engine='pyarrow')
df_im = df_im[df_im['S_INFO_WINDCODE']==index_name]
df_im = df_im.drop('S_INFO_WINDCODE', axis=1)
df_im = df_im.set_index('S_CON_WINDCODE')
df_im['S_CON_INDATE'] = pd.to_datetime(df_im['S_CON_INDATE'])
df_im['S_CON_OUTDATE'] = pd.to_datetime(df_im['S_CON_OUTDATE'])
#
all_stocks_info = self.meta
s = pd.to_datetime(start_dt)
e = pd.to_datetime(end_dt)
tdays = pd.Series(self.tradedays, index=self.tradedays)
tdays = tdays[(tdays>=s)&(tdays<=e)]
tdays = tdays.index.tolist()
#
def if_index_member(series):
nonlocal df_im
dt = series.name
isin = ((df_im['S_CON_INDATE'] <= dt) & ((df_im['S_CON_OUTDATE'] > dt) | (pd.isnull(df_im['S_CON_OUTDATE']))))
isin = isin[isin]
#print(len(isin))
series[isin.index] = 1 #dt时间段是指数成员的话就设置为1
return series
#
dat = pd.DataFrame(index=all_stocks_info.index, columns=tdays, data=0)
dat = dat.apply(if_index_member, axis=0) #每次处理一列
self.close_file(dat, indicator_name)
def TushareFetch():
fetcher = TushareFetcher()
#---------------------------------------------------------------
# 先下载数据到本地
#---------------------------------------------------------------
#fetcher.fetch_meta_data()
#fetcher.fetch_trade_day()
#fetcher.ensure_data(fetcher.daily, "__temp_daily__") #日行情表
#fetcher.ensure_data(fetcher.suspend_d, "__temp_suspend_d__") #停牌表
#fetcher.ensure_data(fetcher.adj_factor, "__temp_adj_factor__") #复权因子表
#fetcher.index_daily()
#---------------------------------------------------------------
# 然后从本地数据生成指标
#---------------------------------------------------------------
#fetcher.create_listday_matrix()
#fetcher.create_trade_status()
#fetcher.create_daily_quote_indicators()
#fetcher.create_index_members('000300.SH', 'hs300_member')
#fetcher.create_index_members('000905.SH', 'zz500_member')
#fetcher.create_index_quote_indicators('open')
#fetcher.create_index_quote_indicators('close')
#fetcher.create_index_quote_indicators('high')
#fetcher.create_index_quote_indicators('low')
if __name__ == '__main__':
TushareFetch()
1
https://gitee.com/phonegapx/sustecher_org.git
git@gitee.com:phonegapx/sustecher_org.git
phonegapx
sustecher_org
sustecher_org
master

搜索帮助