4 Star 8 Fork 6

panyunan/wt4elegantrl-doc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

wt4elegantrl-doc

介绍

本项目为wt4elegantrl非官方文档。wt4elegantrl是基于wtpy开发的gym格式强化学习环境,用于支持交易场景下使用强化学习算法进行训练。
地址:https://github.com/drlgistics/Wt4ElegantRL

项目文件

envs.py: 环境类WtEnv
envs_simple_cta.py:定义了可用于训练的SimpleCTAEnv。实际上就是在WtEnv基础上给出了具体的strategy,stopper, feature,assessment.
features.py:做特征工程的组件,用来返回RL中每步的观测,支持多品种多周期。可直接继承Feature类写新的因子, 也可以引入外部因子数据。
strategies.py:具体策略实现,里面的SimpleCTA是个CTA策略,可以开发别的类型策略例如套利等。
assessments.py:定义了基类Assessment,以及一个简单的可以使用的奖励方案SimpleAssessment。
stoppers.py:定义了agent在何种情况下止盈止损的方案,待开发。
reprocess.py:主要为不懂强化学习的人准备,REPROCESS是预置的一些边界数据处理技巧。
analysts.py:待补充。
compare_elegantrl.py:使用elegantrl(小雅)训练的脚本。小雅必须使用subprocess(多进程)的env训练, 否则wt+小雅一起训练的结果是错的。
compare_rllib.py:使用ray-rllib训练的脚本。不需要多进程env,因为rllib自带了多进程。
compare_sb3.py:使用stable baselines3训练的脚本。需要多进程。
【注】推荐使用rllib或elegantrl,作者的经验是小雅的训练速度比rllib快,调试也比rllib方便。
dataset_from_storage.py:这块是从第三方接口获取行情文件并转换为了wt的内部格式dsb。用自己的数据源可以不用管这个文件。

组件

class WtEnv

该类重要,建议先阅读WtEnv搞清楚大致流程再看其他的类。
流程图(该图是从WtEnv的视角出发,梳理了该类是怎么在wtpy基础上实现了环境相关接口,并且如何驱动了Feature/Assessment组件的运行。 Feature组件的实例化和内部计算流程请参考下面class Feature部分,Assessment组件则相对比较简单,就不做赘述,可参考源码): avatar 解释:
1、WtEnv实例生成后,先执行reset(),在reset里会做个判断,是否有__engine__属性存在,如果不存在则创建WtBtEngine 实例,初始化engine并指定回测时间范围。然后创建新的策略实例加入该engine并启动run_backtest。该engine被保存在env 的__engine__中。

2、接下来执行env.step(action),在step内首先就调用了策略类里的setAction(),作用是保存了step步带过来的参数action 到策略实例属性中。

3、还是在env.step()中,调用engine的cta_step()(实际代码中是调用了env.__cb__step__(),它保存的就是engine.cta_step()), 注意,这里在图上就是对应的蓝字“ 第一次调用cta_step() ”,该cta_step会触发策略类里的on_calculate()(蓝字),在 on_calculate()里会做两个事情,一是调用Feature组件里的calculate(),根据传过来的context和Feature类里的 指标计算方法计算内部或外部特征保存到feature.__obs__。二是紧接着调用了Assessment组件中的calculate(),根据传来的 context计算reward等。然后回到cta_step()函数内。

4、从cta_step()中出来后又进入env.__step__()。在__step__()内第二次调用cta_step()(对应图中红字),第二次cta_step 触发了策略类中的on_calculate_done()。在on_calculate_done()中可以拿到前期由setAction保存到实例属性中的动作 action。根据action用context中stra_xxx方法下单。

5、第二次从cta_step()中出来后返回到env.step(),step()执行结束,并返回元组(obs,reward,done,info)。

这就是WtEnv如何通过wtpy实现了一个RL环境。(具体实现请参考代码,流程图不完全反映代码实现细节)

  • 类属性

    TRAINER:1; EVALUATOR:2; DEBUGGER:3
    (在该类初始化时用参数mode控制环境运行在何种模式下)

  • 属性

    _id_:(privte) 该环境的编号
    _iter_:(private) 内部记录环境进行到第几轮
    _run_:(private) 该环境是否在运行
    __strategy__:(private) 加载的策略,在策略对象中实现具体的买卖行为,也可定义不同的策略种类如投机或者套利等等。
    _et_:(private) wt的EngingType,只支持cta engine,也就是说本项目只支持K线级别的RL训练(分钟级),不支持tick训练,根据作者的说法,要支持tick训练,代码得大改。
    __stopper__:(private) 暂无具体功能
    __slippage__:(private) 设置滑点
    __feature__:(private) 关联的Feature组件,在这里做自己的指标或加载自定义的特征
    __assessment__:(private) 关联的奖励方案组件
    __time_range__:(private) 时间范围,为一个tuple, 这个tuple中可以定义很多不同的timerange(也是区间),每一个timerange为一个episode(这里是指强化学习中的术语 An episode is one complete play of the agent interacting with the environment in the general RL setting. Episodic tasks in RL means that the game of trying to solve the task ends at a terminal stage or after some amount of time.) 在这里,通过给定的tuple规定了episode, 是一个定量的时间区间。对于加载了分钟数据想训练日内agent的话,可以每个timerange设为一天。
    __cb_step__:(private) 根据策略引擎类型是CTA还是HFT,__cb_step__ 被赋值为WtBtEngine的cta_step方法或hft_step方法。
    observation_space:(public) 返回环境的状态空间
    action_space:(public) 返回环境的动作空间
    assets:(@property) 返回该时间步市值

  • 方法

    def __init__(self, strategy: StateTransfer, stopper: Stopper, feature: Feature, assessment: Assessment, time_range: tuple, slippage: int = 0, id: int = getpid(), mode=1, ):
    stopper接收自定义的Stopper组件,用来控制止盈止损逻辑。
    feature接收Feature组件,该组件为特征工程逻辑,用来提供环境执行step后返回的观测observations。
    assessment:自定义奖励方案,assessment的calculate()将输出奖励值(为一个标量)
    __init__里根据给的mode加载了log_xxx.json,这将决定wtpy引擎在环境运行过程的输出信息的级别。
    self._et_是传入的strategy的EngineType,然后下面的reset再根据这里的self._et_去决定初始化一个CTA还是HFT引擎等等

    def __step__(self):
    在这个方法中又调用一次finished = not self._cb_step_(), 并且做了环境是否结束的判断。
    def close(self): 如果实例的_run_标志为True,并且具有_engine_,则结束引擎的回测过程并重置_run_为False
    def reset(self):
    重置环境,并输出初始时的observations。
    time_start, time_end = self.__time_range__[self._iter_%len(self.__time_range__)] 这句根据传入的timerange(比如默认的是24个range)首先赋值time_start, time_end这两个变量为第一个range(episode) 的开始和结束时间,并在后面用self._engine_.configBacktest(time_start, time_end)告诉引擎运行的时间范围。后面 第二轮再reset的时候,_iter_相应+1,则time_start, time_end也相应的取到第二个range里的开始/结束时间,如此循环。

    目前仿真器只支持直接读dsb,没有支持用extloader导入自定义数据。如有需要,要将自己的行情文件转为dsb放入storage中对应目录,然后在引擎中载入: 这里WtBtEngine根据enginetype不同会相应加载cta.json或hft.json,然后json中指定了storage目录,引擎从storage中直接读取dsb行情文件。

    WtBtEngine回测引擎是在reset里被初始化的,引擎以及策略以及后面的self._engine_.set_cta_strategy 设置策略的代码要写在reset里而不是__init__是因为这些类底层是C++,如果写在__init__里,再在reset里deepcopy这些类会行不通。

    这些类的初始化全写在reset里的确会增加一些开销。但由于 (1)reset过程不会重载数据(除非你变更品种了,不然不会重载数据) (2) 整个训练过程中,reset的开销占比是微乎其微(step步才是经常被使用的) 两方面原因,所以reset里稍微复杂些完全可以接受。

    并根据self._et_决定建立CTA或者HFT引擎以及加载不同的json以及决定这个环境的一步是cta_step还是hft_step。
    最后启动回测引擎self._engine_.run_backtest(bAsync=True... ,这里的bAsync参数设为异步运行,是为了可以触发策略里的on_calculate_done方法,在on_calculate_done方法中我们会做动作空间中的一个动作到目标仓位的映射逻辑。 bNeedDump参数在大规模使用时需要设置为True,用来导出一份配置,这样遇到问题时可以查看,一般用不到。0.9wtpy已移除bNeedDump,新增参数persistData。

    【注】 1、训练中金所品种的话,要对其历史数据做处理。它改过交易时间,所以要自己修复,用正常的时间生成的dsb是会有问题的。另外尽量避免训练平今很贵的品种。2、如果加载多个标的和多个周期的行情:对于同一品种的不同周期,建议设置小周期为主K;对于相同周期的不同品种,建议设置成交量活跃的品种为主K。

    def step(self, action):
    智能体选择一个动作,并推进一步,返回新的观测,奖励值,是否结束标志和调试信息。
    self._strategy_.setAction(action)让策略把action保存到策略对象的属性中。

    self._cb_step_()触发回测引擎推动一步,这一步将触发策略里的on_xxx回调函数, 那么在on_calculate_done里可以拿到之前保存到的action,然后通过context参数里的下单方法执行买卖行为。 这样就将action和真实买卖行为联系在一起。

    【注意】开启异步回测后,回测引擎的一个ctastep将触发oncalc,第二个ctastep将触发同交易日的oncalcdone, 第三个ctastep才触发下一个交易日的oncalc,所以强化学习中的env.step实际上需要调用两次engine.cta_step。 在本项目中,self.step中self._cb_step_()调用一次,并且在self.__step__中的finished = not self._cb_step_() 语句又调用了一次。
    def analyst(self, iter: int):文档待写
    def analysts(self):文档待写

class SimpleCTAEnv(WtEnv)

SimpleCTAEnv直接继承WtEnv,并传入了具体的Indicator,SimpleAssessment,SimpleCTA组件, 定义了滑点和mode。

class Feature

这里以项目源码中实例化一个Feature类的过程以及在env中触发了feature里 的subscribe()calculate()后内部的计算流程来讲解。(先打开features.py)

1、首先在SimpleCTAEnv的__init__()里,我们看到feature是这样被初始化的,如下:

feature: Indicator = Indicator(
code='DCE.c.HOT', period=Indicator.M5, roll=1, assets=assets) 

这里传入的code和period将作为一个元组保存到feature对象的__main__属性,用于以后在订阅行情时判断某行情是否是主K线。

2、然后使用了feature.addSecurity()方法添加其他标的:

feature.addSecurity(code='DCE.cs.HOT')
feature.addSecurity(code='DCE.m.HOT')

addSecurity()在feature类内部做的事就是把code添加到__securities__属性中,便于后面使用。

3、对__securities__中的标的计算特征:

for period in (feature.M5,  feature.M10):  
    feature.macd(period)
    feature.kdj(period)

源码如上,这是使用了M5,M10周期对所有标的建立了macd,kdj等多因子。其中macd,kdj是用户实现的因子, 可以定义自己的因子。
以kdj方法为例,解释下因子的写法:

def kdj(self, period: str, fastk_period: int = 5, slowk_period: int = 3, reprocess: REPROCESS = MAXMIN):
    def kdj(context: CtaContext, code: str, period: str, args: dict):
        bars = context.stra_get_bars(
            stdCode=code, period=period, count=self.__subscribies__[period])
        k, d = ta.STOCH(high=bars.highs, low=bars.lows,
                        close=bars.closes, **args)
        return k, d, (3 * k - 2 * d)
        return k / 100, d / 100, (3 * k - 2 * d) / 100

    self._subscribe_(period=period, count=10 + 1 + reprocess.n())
    self._callback_(space=3, period=period, callback=kdj, reprocess=reprocess,
                    fastk_period=fastk_period, slowk_period=slowk_period)

可以看到有两个def kdj, 第一个是回调函数,第二个是该因子的计算函数(也就是计算逻辑定义在这个里面)。名字起的 一不一样都可以,python有命名空间的。第二个kdj内用context里的接口获取到行情数据,使用talib计算指标,然后 返回由三个序列构成的元组。
然后调用self._subscribe_()把需要订阅的信息保存到self.__subscribies__里,__subscribies__里保存了需要订阅的行情信息, 先保存在这,后面在wtpy层面策略里的on_init中就按照这些需要订阅的条件去一一订阅行情。
再调用self._callback_()_callback_的参数space是指定因子输出值会占用几列,在该例中输出3个序列占用3列; callback参数传入因子的计算函数,也就是第二个kdj函数(里面的def kdj), 该方法目的是把该因子作为了一个计算任务保存到了self.__cb__里。self.__cb__是一个字典套字典 的结构,形如:

{'m5': {'price': (1,
   <function features.Feature.price.<locals>.price(context: <module 'wtpy.CtaContext' from 'C:\\Users\\Y\\Documents\\code\\superduo\\demos\\Wt4ElegantRL\\wtpy\\CtaContext.py'>, code: str, period: str, args: dict)>,
   reprocess.MAXMIN,
   {}),
  'volume': (1,
   <function features.Feature.volume.<locals>.volume(context: <module 'wtpy.CtaContext' from 'C:\\Users\\Y\\Documents\\code\\superduo\\demos\\Wt4ElegantRL\\wtpy\\CtaContext.py'>, code: str, period: str, args: dict)>,
   reprocess.MAXMIN,
   {}),
  'bollinger': (3,
   <function features.Indicator.bollinger.<locals>.bollinger(context: <module 'wtpy.CtaContext' from 'C:\\Users\\Y\\Documents\\code\\superduo\\demos\\Wt4ElegantRL\\wtpy\\CtaContext.py'>, code: str, period: str, args: dict)>,
   reprocess.MAXMIN,
   {'timeperiod': 5, 'nbdevup': 2, 'nbdevdn': 2}),
  'macd': (3,
   <function features.Indicator.macd.<locals>.macd(context: <module 'wtpy.CtaContext' from 'C:\\Users\\Y\\Documents\\code\\superduo\\demos\\Wt4ElegantRL\\wtpy\\CtaContext.py'>, code: str, period: str, args: dict)>,
   reprocess.MAXMIN,
   {'fastperiod': 12, 'slowperiod': 26, 'signalperiod': 9}),
  'kdj': (3,
   <function features.Indicator.kdj.<locals>.kdj(context: <module 'wtpy.CtaContext' from 'C:\\Users\\Y\\Documents\\code\\superduo\\demos\\Wt4ElegantRL\\wtpy\\CtaContext.py'>, code: str, period: str, args: dict)>,
   reprocess.MAXMIN,
   {'fastk_period': 5, 'slowk_period': 3})}}

第一个key是周期,然后该key下又是一个字典,这个字典下的key是因子名,然后对于的值是一个元组,保存了因子相关信息。如: 该因子输出所占用的列数,因子计算函数,因子值标准化处理方式,等等。
正因为调用了self._callback_(),所以该因子的相关信息被保存到self.__cb__里, 而后面feature.calculate()就是从__cb__中拿到需要计算的因子的。
所以自己在定义了一个因子后,不仅要写计算过程,也不要忘了执行self._subscribe_()self._callback_()这两句。 再说一遍,这两句是用来告诉框架计算这个因子需要订阅哪些行情数据,以及将因子信息写到__cb__,从而在calculate() 中可以得到计算。

4、到此feature初始化完毕,将作为参数传入env,strategy中。

5、下面来看feature被调用的情景。第一个情景:engine.run_backtest()后触发策略的on_init()。在on_init()中, 执行了self._feature_.subscribe(context)

6、看看feature.subscribe()内发生了什么事。
遍历__subscribies__,订阅特定代码特定周期的行情,主K标志由self.__main__确定。 在这里订阅后,在之后的calculate()中通过context拿到行情的信息。

7、feature被调用的第二个情景:如一开始的流程图所述,第一次cta_step触发策略类中的on_calculate(), 在on_calculate()中执行了self._feature_.calculate(context=context)

8、看看feature.calculate()内发生了什么事。
首先取到当前step的时间戳,并赋值给self.__time__(格式形如202107010905)。然后在self.__obs__中查找该时间戳 对应的观测是否存在,如果不存在,则进入计算过程,计算该时间戳(步)obs观测,并保存到__obs__字典中,键值就是该步的时间戳。
在计算过程内,先生成规定形状(由self.__shape__确定)的ndarray,值全填充为np.nan。然后遍历self.securities所有标的 代码,对于每一个标的,从self.__cb__中拿到需要计算的因子信息。然后从因子信息中拿到函数对象callback(callback是计算因子的实现函数, 就是上面kdj例子中的内部的那个kdj函数),并调用,将因子值 保存下来,填充到obs内,再赋值给__obs__字典。然后框架内部在__obs__字典中会自动加入"开仓的最大浮盈、开仓的最大亏损、 开仓的浮动盈亏、当前持仓数"4列也作为观测的一部分。

9、环境的step中通过env._feature_.obs拿到该步观测。obs在feature类中是个计算属性,内部是在self.__obs__字典中拿到该时间步的观测, 并做flatten处理(将多维向量reshape为一维向量)后返回。

class Indicator(Feature)

内置的特征工程类,提供了kdj,rsi等传统技术指标因子(内部使用talib计算),并支持预先计算好的的因子的载入(参考weights方法中example()的伪代码)

  • 方法

    def obv(self, period: str, reprocess: REPROCESS = MAXMIN):在状态空间中加入obv因子。其他需要计算的因子仿照该方法写即可。
    def weights(self, period: str, timeperiod:int=1, index:str='000300', reprocess:REPROCESS =REPROCESS):
    引入外部因子数据。具体做法是:预先计算好的因子(如已经有一个因子dataframe)可通过下面的example()进行加载,example函数中写获取因子的逻辑并返回ndarray格式的特征序列构成的元组,同时指定下面的space参数为特征的数量。

class StateTransfer

一个纯粹的CTA策略是继承于wtpy的BaseCtaStrategy的,但BaseCtaStrategy可使用的信息就是CtaContext; 那我们的策略是要在强化学习中做决策的,所以要增加一些功能,这些功能定义在StateTransfer (具体的策略类通过继承StateTransfer增加了BaseCtaStrategy的功能,通过setAction方法保存了动作action, 并且在on_xxx回调中可以使用此action.)。 所以我们写的具体强化学习的策略类同时继承了StateTransfer和BaseCtaStrategy。

  • 方法

    def EngineType() -> int:
    定义继承此类的策略,属于CTA还是HFT,之后就可以通知WtBtEngine以何种eType去初始化
    def Action(size: int) -> dict:返回策略可做动作空间
    def setAction(self, action):
    1.setAction这里将action保存到子类(策略类对象)的属性中,为了策略类的on_calculate_done方法可以方便从属性中拿到动作,从而实现动作到目标仓位的映射。(因为on_calculate_done方法的签名无法将action带入,所以需要从setAction饶一道)
    2.实际使用时,StateTransfer的setAction()也将动作和买卖行为相隔离。

class REPROCESS

主要为不懂强化学习的人准备,REPROCESS是预置的一些边界数据处理技巧

wtpy相关 (为使用仿真器前需要了解的wtpy相关内容)

一、使用自定义的数据源进行回测

这一块不难,但自己也看了好多天才成功,在这里总结一下使用wtpy回测自定义数据。在wt4erl下大家会最终是要用自己的特征数据去训练agent,这篇文档介绍如何通过wtpy加载自己的数据。

1.下载demos

使用的版本为wtpy0.9,现已正式发布,直接pip install安装,并下载0.9 demos(去github wtpy)。demos里的storage文件夹内就是回测会使用的数据文件,在下一步我们会将自己的csv文件也存放在这个目录下。demos里的cta_fut_bt为期货cta回测示例。

2.准备自定义数据文件

我的csv文件格式如下:

datetime,open,close,high,low,volume,money
2019-12-02 09:01:00,5568,5565,5570,5565,3110,173040400
2019-12-02 09:02:00,5565,5564,5566,5562,1894,105382160
2019-12-02 09:03:00,5564,5562,5565,5561,1790,99595600
2019-12-02 09:04:00,5563,5563,5564,5562,1000,55640000
2019-12-02 09:05:00,5563,5564,5564,5561,1096,60981440

a)可以看到数据的格式跟/storage下原有的格式不同,包括列名不同和datetime列也不同(我的文件date/time作为一列,而示例的csv中为两列)。因为这里的差异所以后面我继承BaseExtDataLoader并实现一个自己的load_final_his_bars方法用来加载我这种格式的csv。
b)文件名要统一命名为标准格式,比如我的csv是SR的一分钟数据,那么我命名为CZCE.SR.HOT_m1.csv
c)将准备好的数据文件放入storage/csv下

3.定义自己的DataLoader

先打开test_dataexts/testDtLoader.py,这是个用extloader加载自定义数据的例子,我们将在这上面修改:
a)将engine.configBTStorage这句的参数改为(mode="csv", path="../"),这里的path改为的当前目录,否则原来程序会去storage/csv下读文件,而我们的文件之前说过里面的格式和标准的是不一样的,所以会读取加载失败,我们要让框架通过extloader加载自定义的数据。
b)继承BaseExtDataLoader并实现自己的load_final_his_bars方法(0.9之前版本此方法为load_his_bars)。我们直接在原有代码的基础上修改,主要是加载csv文件和转换格式的过程,如下:

class MyDataLoader(BaseExtDataLoader):
    def load_final_his_bars(self, stdCode: str, period: str, feeder) -> bool:
        '''
        加载历史K线(回测、实盘)
        @stdCode    合约代码,格式如CFFEX.IF.2106
        @period     周期,m1/m5/d1
        @feeder     回调函数,feed_raw_bars(bars:POINTER(WTSBarStruct), count:int, factor:double)
        '''
        print("loading %s bars of %s from extended loader" % (period, stdCode))
        # 首先读入自己的数据文件
        # df = pd.read_csv('./storage/csv/CFFEX.IF.HOT_m1.csv')
        df = pd.read_csv('../storage/csv/CZCE.SR.HOT_m1.csv')

        # 下面三句转换自定义数据中的datetime字段到标准的date和time两个字段,并替换年月日中的-为/
        df['date'] = df.apply(lambda x: x['datetime'][0:10], axis=1)
        df['date'] = df['date'].str.replace('-', '/')
        df['time'] = df.apply(lambda x: x['datetime'][-8:], axis=1)
        # 这里转换列名
        df = df.rename(columns={
            'date': 'date',
            'time': 'time',
            'open': 'open',
            'high': 'high',
            'low': 'low',
            'close': 'close',
            'volume': 'vol',
        })


c)最后,要让回测引擎通过extloader加载数据,要在引擎初始化后加上一句:

engine = WtBtEngine(EngineType.ET_CTA)
engine.set_extended_data_loader(loader=MyDataLoader(), bAutoTrans=True)

并且要在策略的on_init中订阅了这个品种,这样engine在当前目录找不到这个品种的csv时, 会自动调用MyDataLoader.load_final_his_bars()去加载数据。run运行,会发现成功执行回测。
【补充】关于extloader加载多品种多周期的用法。举例说明:比如现在需要载入两个品种各两个周期共4个自定义行情文件, 那么在策略的on_init里需要订阅四次,每执行一个订阅后,都会将标的代码和周期作为参数传到load_final_his_bars内, 然后可以在load_final_his_bars()内拿到标的代码和周期(都是str),然后读取对应的行情文件成df,再转换成标准格式的df, 接下来就是按原有demos里的代码生成了buffer,最后调用feeder(buffer,len(df))将该品种该周期的buffer feeder进去即可。 四次的订阅会触发四次load_final_his_bars,所以你应该根据传入的参数去读取相应品种和周期的行情文件。
注意:1、每个品种最多你只需要传进3个周期(m1, m5, d1),其他周期会自动resample,也就是一个标的最多读三次。 2、强化学习中智能体需要加载多个品种时,要确保多个品种都具有同一个session,例如IF和SR就不能放在一个智能体中训练。

二、wtpy跑simnow仿真

跑simnow仿真和wt4erl项目并无直接关系,这里作为记录文档。

首先需要更新common/hots.json,可以用test_hotpicker/testHots.py脚本自动更新, testHots.py中函数的sdate,edate自行设置一下,例如设为近三天这样一个范围。 其次要打开数据服务datakit_fut/runDT.py,最后运行策略文件(cta_fut/run.py)。

自己写的一个订阅了多品种行情的策略,没有下单过程,只是体会一下on_xxx方法的触发

值得注意的是on_bar在每个品种BAR闭合时都触发一次,on_calculate在所有BAR都闭合后只触发一次。

# -- coding: utf-8 --
from typing import List
from wtpy import WtEngine, EngineType
from Strategies.DualThrust import StraDualThrust
from ConsoleIdxWriter import ConsoleIdxWriter
from wtpy import BaseCtaStrategy
from wtpy import CtaContext
import numpy as np

class MultiSymbolStrategy(BaseCtaStrategy):
    def __init__(self, name: str, codes: List[str], period: str, barcnt=50):
        BaseCtaStrategy.__init__(self, name)
        self.codes = codes
        self.period = period
        self.barcnt = barcnt

    def on_init(self, context: CtaContext):
        for code in self.codes:
            # 订阅多品种行情,并将传入品种列表的第一个作为主K
            context.stra_get_bars(code, self.period, self.barcnt, isMain=True if code == self.codes[0] else False)
        context.stra_log_text("进入 oninit")

    def on_calculate(self, context: CtaContext):
        context.stra_log_text("进入 on calculate (所有标的闭合时触发oncalculate)")

    def on_calculate_done(self, context: CtaContext):
        context.stra_log_text("进入 on calculate done (必须在回测模式下开启异步才能触发calc done)")

    def on_bar(self, context: CtaContext, stdCode: str, period: str, newBar: dict):
        context.stra_log_text("进入 on bar (单个标的闭合时触发onbar)")
        print(stdCode, period, newBar)

    def on_tick(self, context: CtaContext, stdCode: str, newTick: dict):
        context.stra_log_text("进入 on tick")
        print("时间戳%s 当前品种%s 价格%s " % (newTick[0], newTick[2], context.stra_get_price(stdCode)))

if __name__ == "__main__":
    # 创建一个运行环境,并加入策略
    env = WtEngine(EngineType.ET_CTA)
    env.init('../common/', "config.yaml")
    straInfo = MultiSymbolStrategy(name='demo1', codes=['SHFE.rb.HOT', 'SHFE.cu.HOT'], period='m1')
    env.add_cta_strategy(straInfo)
    idxWriter = ConsoleIdxWriter()
    env.set_writer(idxWriter)
    env.run()
    kw = input('press any key to exit\n')

三、csv和dsb互相转换

利用了WtDataHelper类实现csv,dsb格式的互转,这里以bar数据为例。 WtDataHelper类里的trans_bars/trans_ticks已弃用,在0.9中使用store_bars/store_ticks转化csv到dsb, 使用dump_bars/dump_ticks将某目录内的所有dsb还原成csv。

我的csv读成pd.DataFrame后表结构如下(从别的数据源如数据库文件读成df后也按照这个例子转为dsb):

Unnamed: 0,open,close,high,low,volume,money
2019-12-02 09:01:00,5568,5565,5570,5565,3110,173040400
2019-12-02 09:02:00,5565,5564,5566,5562,1894,105382160

使用如下脚本将上述格式的df转化为dsb:

import sqlite3
from wtpy.wrapper import WtDataHelper
from wtpy.WtCoreDefs import WTSBarStruct
import pandas as pd

def strToDate(strDate: str) -> int:
    items = strDate.split("/")
    if len(items) == 1:
        items = strDate.split("-")

    if len(items) > 1:
        return int(items[0]) * 10000 + int(items[1]) * 100 + int(items[2])
    else:
        return int(strDate)

def sqlite2df(db_addr, table_name):
    conn = sqlite3.connect(db_addr)
    return pd.read_sql("select * from %s" % (table_name), conn)


if __name__ == '__main__':
    # 我这里使用sqlite文件读成df的,也可直接读一个csv文件进来成为df
    # 这里的df格式如上一个代码块所示
    df = sqlite2df('C:\\Users\\Y\\Documents\\code\\superduo\\data\\srif.db', 'ifd')
    # 调整格式
    # 分离出date
    df['date'] = df.apply(lambda x: x['Unnamed: 0'][0:10], axis=1)
    df['date'] = df['date'].str.replace('-', '/')
    # 分离出time
    df['time'] = df.apply(lambda x: x['Unnamed: 0'][-8:], axis=1)
    df['vol'] = df['volume']

    # 转化date列从2015-01-05格式转为20150105
    df['date'] = df.apply(lambda x: strToDate(x['date']), axis=1)

    # 这里给了一个长度len_df, 告诉之后的buffer有多少个WTSBarStruct
    len_df = len(df)
    helper = WtDataHelper()

    # 这种写法是预分配一个list的长度,这样直接用下标访问不会报越界的错误,如果不预分配就会报越界的错误
    # WTSBarStruct是一个内存块,BUFFER是声明一个内存段,这个内存段连续存储了很多BAR数据,python生成这个内存段直接扔给C++处理
    BUFFER = WTSBarStruct * len_df
    buffer = BUFFER()
    i = 0
    for _, row in df.iterrows():
        curbar = buffer[i]
        curbar.date = row["date"] 
        curbar.high = float(row["high"])
        curbar.low = float(row["low"])
        curbar.open = float(row["open"])
        curbar.close = float(row["close"])
        curbar.vol = int(row["vol"])
        # hold和diff 就是现有仓单和仓单变化 ,如果用不上就都放0
        curbar.hold = 0  # 原df没有该字段的值,赋值0
        curbar.diff = 0  # 原df没有该字段的值,赋值0
        i += 1
    # buffer是整个内存段,这里通过传参告诉底层这个buffer在哪,然后有多少长度,底层会自动处理。
    helper.store_bars('test2.dsb', buffer, len_df, 'd')
    # 将本文件夹内的dsb还原为csv,文件名不变
    helper.dump_bars(".", ".")

最后helper.store_bars('test2.dsb', buffer, len_df, 'd')这句生成了dsb文件。

可以看到脚本最后一行是用了dump_bars方法生成了test2.csv, 打开test2.csv可以看到还原为csv后数据是对的上的(还原为csv后date列都变为19900000,此问题等待wtpy更新)。虽然列的值对的上,但dsb的列名变成了: date,time,open,high,low,close,settle,volume,turnover,open_interest,diff_interest。

这是因为dsb内数据列只固定了顺序,dsb就是一个C内存块,里面只有值,按顺序放(dsb是zst压缩,dmb是不压缩)。 它的好处是dsb打开就是一块内存,读写效率高,缺点就是里面没有字段信息。数据列完全依赖顺序, 你看到的open/close,都是C定义的顺序映射,没保存在内存中。(如果想深入csv dsb这块技术的话, 可以搜一下代码更新的历史,里面很多很多python和c++的技巧)

空文件

简介

wt4elegantrl-doc 展开 收起
Python
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/panyunan/wt4elegantrl-doc.git
git@gitee.com:panyunan/wt4elegantrl-doc.git
panyunan
wt4elegantrl-doc
wt4elegantrl-doc
master

搜索帮助