2 Star 3 Fork 0

mikesheng/cyvn

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test.py 2.54 KB
一键复制 编辑 原始数据 按行查看 历史
mikesheng 提交于 2017-09-22 17:51 . 更新 test.py
# -*- coding: utf-8 -*-
import hashlib, os, sys, tempfile, time
from ctp.futures import ApiStruct, MdApi
from event.vtEvent import *
class MyMdApi(MdApi):
def __init__(self, brokerID, userID, password, instrumentIDs):
self.requestID = 0
self.brokerID = brokerID
self.userID = userID
self.password = password
self.instrumentIDs = instrumentIDs
self.Create()
def Create(self):
dir = b''.join((b'ctp.futures', self.brokerID, self.userID))
dir = hashlib.md5(dir).hexdigest()
dir = os.path.join(tempfile.gettempdir(), dir, 'Md') + os.sep
if not os.path.isdir(dir): os.makedirs(dir)
MdApi.Create(self, os.fsencode(dir) if sys.version_info[0] >= 3 else dir)
def RegisterFront(self, front):
if isinstance(front, bytes):
return MdApi.RegisterFront(self, front)
for front in front:
MdApi.RegisterFront(self, front)
def OnFrontConnected(self):
print('OnFrontConnected: Login...')
req = ApiStruct.ReqUserLogin(
BrokerID=self.brokerID, UserID=self.userID, Password=self.password)
self.requestID += 1
self.ReqUserLogin(req, self.requestID)
def OnFrontDisconnected(self, nReason):
print('OnFrontDisconnected:', nReason)
def OnHeartBeatWarning(self, nTimeLapse):
print('OnHeartBeatWarning:', nTimeLapse)
def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
print('OnRspUserLogin:', pRspInfo)
if pRspInfo.ErrorID == 0: # Success
print('GetTradingDay:', self.GetTradingDay())
self.SubscribeMarketData(self.instrumentIDs)
def OnRspSubMarketData(self, pSpecificInstrument, pRspInfo, nRequestID, bIsLast):
print('OnRspSubMarketData:', pRspInfo)
def OnRspUnSubMarketData(self, pSpecificInstrument, pRspInfo, nRequestID, bIsLast):
print('OnRspUnSubMarketData:', pRspInfo)
def OnRspError(self, pRspInfo, nRequestID, bIsLast):
print('OnRspError:', pRspInfo)
def OnRspUserLogout(self, pUserLogout, pRspInfo, nRequestID, bIsLast):
print('OnRspUserLogout:', pRspInfo)
def OnRtnDepthMarketData(self, pDepthMarketData):
print('OnRtnDepthMarketData:', pDepthMarketData)
if __name__ == '__main__':
mdapi = MyMdApi(b' ', b'', b'', b'')
mdapi.RegisterFront(b'tcp://180.168.146.187:10010')
mdapi.Init()
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
pass
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/mikesheng/cyvn.git
git@gitee.com:mikesheng/cyvn.git
mikesheng
cyvn
cyvn
master

搜索帮助