416 Star 2.3K Fork 1K

GVPvn.py官方 / vn.py

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
run.py 2.95 KB
Copy Edit Raw Blame History
noranhe authored 2021-09-24 18:22 . update
import multiprocessing
import sys
from time import sleep
from datetime import datetime, time
from logging import INFO
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
from vnpy_ctastrategy import CtaStrategyApp
from vnpy_ctastrategy.base import EVENT_CTA_LOG
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
ctp_setting = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"行情服务器": "",
"产品名称": "",
"授权编码": "",
"产品信息": ""
}
# Chinese futures market trading period (day/night)
DAY_START = time(8, 45)
DAY_END = time(15, 0)
NIGHT_START = time(20, 45)
NIGHT_END = time(2, 45)
def check_trading_period():
""""""
current_time = datetime.now().time()
trading = False
if (
(current_time >= DAY_START and current_time <= DAY_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
return trading
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
sleep(10)
cta_engine.init_engine()
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭子进程")
main_engine.close()
sys.exit(0)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
child_process = None
while True:
trading = check_trading_period()
# Start child process in trading period
if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("子进程启动成功")
# 非记录时间则退出子进程
if not trading and child_process is not None:
if not child_process.is_alive():
child_process = None
print("子进程关闭成功")
sleep(5)
if __name__ == "__main__":
run_parent()
Python
1
https://gitee.com/vnpy/vnpy.git
git@gitee.com:vnpy/vnpy.git
vnpy
vnpy
vn.py
master

Search

53164aa7 5694891 3bd8fe86 5694891