Ai
1 Star 0 Fork 0

红云缭乱/GetMLData
暂停

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
Get_ML_Dataset.py 6.51 KB
一键复制 编辑 原始数据 按行查看 历史
红云缭乱 提交于 2022-09-05 11:45 +08:00 . Update
import configparser as cp
import os
import time
import pandas as pd
def time_dec(func):
'''
用于计算运行时间的装饰器
:param func: 目标函数
:return: wrapper
'''
def wrapper(*args, **kwargs):
t = time.perf_counter()
res = func(*args, **kwargs)
print(func.__name__, '操作执行用时:', time.perf_counter() - t)
return res
return wrapper
def exceptions_dec(func):
'''
用于捕获异常的装饰器,仅用于单元测试
:param func: 目标函数
:return: wrapper
'''
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
raise Exception(e)
return wrapper
def tradedt_pegging(date, df):
'''
根据时间反查数据在parquet轴的位置
:param date: 时间,一般为trade_dt(int)
:param df:并表后的Data
:return:位置所对应的index
'''
obj = df.iloc[(df['trade_dt'] - date).abs().argsort()[0], :].tolist()
t = obj[1]
return t
def stockcode_pegging(stock, df):
'''
根据股票反查数据在parquet轴的位置
:param stock: 股票,一般为stockcode(str)
:param df: 并表后的Data
:return: 位置所对应的index
'''
obj = df.loc[(df['stockcode'] == stock), :]
s = obj.iloc[0, 1]
return s
@time_dec
def load_data(root_dir):
'''
加载parquet并横向拼接(并表)
:param root_dir: parquet文件所在目录
:return:并表后的Data
'''
Data = pd.DataFrame()
count = 0
for file in os.listdir(root_dir):
file_name = root_dir + file
with open(file=file_name):
df = pd.read_parquet(file_name)
if count == 0:
Data = df
count = count + 1
print('第', count, '个因子已载入')
else:
Data = pd.concat([Data, df.iloc[:, 2]], axis=1)
count = count + 1
print('第', count, '个因子已载入')
return Data
@time_dec
def select_data_random(data, stocklist, starttime, endtime, iternum=500):
'''
按照stocklist、starttime、endtime过滤数据,该方法通过扫描(随机读写)方式取得index,性能不佳
:param data: 并表后的Data
:param stocklist: 目标股票清单
:param starttime:起始时间
:param endtime:结束时间
:param iternum: 循环次数,用于控制stocklist中的股票数量,如等于2时仅扫描stocklist中的前2只股票
:return:过滤后的Data
'''
dataset = pd.DataFrame()
for i in range(iternum):
if i < 1:
data1 = data[(data['trade_dt'] > starttime) & (data['trade_dt'] < endtime)].sort_values(by='stockcode',
ascending=True)
data2 = data1[data1['stockcode'] == stocklist[i]].sort_values(by='trade_dt', ascending=True)
dataset = data2
print(dataset.count(), '第', i + 1, '只股票已扫描')
elif i >= 1:
data1 = data[(data['trade_dt'] > starttime) & (data['trade_dt'] < endtime)].sort_values(by='stockcode',
ascending=True)
data2 = data1[data1['stockcode'] == stocklist[i]].sort_values(by='trade_dt', ascending=True)
dataset = dataset.append(data2)
print(dataset.count(), '第', i + 1, '只股票已扫描')
return dataset
@time_dec
def select_data(data, starttime, endtime, iternum=500, cnfig='./config.ini'):
'''
按照stocklist、starttime、endtime过滤数据,该方法通过反查表计算index的位置,性能较好
:param data: 并表后的Data
:param stocklist: 目标股票清单
:param starttime:起始时间
:param endtime:结束时间
:param iternum: 循环次数,用于控制stocklist中的股票数量,如等于2时仅扫描stocklist中的前2只股票
:return:过滤后的Data
'''
# 加载配置文件
ini_file = cp.ConfigParser()
ini_file.read('config.ini')
zz500list = ini_file['Path']['zz500list']
trade_dt_table_path = ini_file['Path']['trade_dt_table_path']
stockcode_table_path = ini_file['Path']['stockcode_table_path']
# 从配置文件路径读取股票列表
with open(zz500list):
stocklist = pd.read_csv(zz500list)
stocklist = stocklist['stockcode'].tolist()
# 从配置文件路径读取股票/时间反查表
with open(trade_dt_table_path):
trade_dt_table = pd.read_csv(trade_dt_table_path)
with open(stockcode_table_path):
stockcode_table = pd.read_csv(stockcode_table_path)
dataset = pd.DataFrame()
for i in range(iternum):
starindex = stockcode_pegging(stocklist[i], stockcode_table) + tradedt_pegging(starttime, trade_dt_table)
endindex = stockcode_pegging(stocklist[i], stockcode_table) + tradedt_pegging(endtime, trade_dt_table)
data1 = data.iloc[starindex:endindex, :].sort_values(by='stockcode', ascending=True)
if i < 1:
dataset = data1
print(dataset.count(), '第', i + 1, '只股票已扫描')
elif i >= 1:
dataset = dataset.append(data1)
print(dataset.count(), '第', i + 1, '只股票已扫描')
return dataset
@time_dec
def dropNa_of_data(df):
'''
删除含有缺失值的数据
:param df: 并表后的Data
:return: 无缺失值的Data
'''
num1 = len(df)
df.dropna(inplace=True)
num2 = len(df)
print('原有数据', num1, '条,现有数据', num2, '条,其中', num1 - num2, '条含有缺失值的数据被丢弃')
return df
if __name__ == '__main__':
# 加载配置文件
ini_file = cp.ConfigParser()
ini_file.read('config.ini')
inputpath = ini_file['Path']['inputpath']
outputpath = ini_file['Path']['outputpath']
# 从配置文件读取起止时间
starttime = int(ini_file['time']['starttime'])
endtime = int(ini_file['time']['endtime'])
# 加载parquet并横向拼接
Data = load_data(root_dir=inputpath)
# 按照股票和起止时间过滤数据
Data = select_data(data=Data, starttime=starttime, endtime=endtime)
# 剔除含有缺失值的数据
Data = dropNa_of_data(Data)
# Dataset输出为feather文件
# Data = Data.reset_index(drop=True)
# Data.to_feather(path=outputpath + 'ML_Dataset.f')
# Dataset输出为csv文件
Data = Data.reset_index(drop=True)
Data.to_csv(outputpath + 'ML_Dataset.csv',index=False)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/redcloudRC/GetMLData.git
git@gitee.com:redcloudRC/GetMLData.git
redcloudRC
GetMLData
GetMLData
master

搜索帮助