vnpy_ctp源码下载后转变为python可用的处理过程

目录

写在前面 

下载源码并解压

创建python项目

环境

过程

编译vnpy_ctp源码

验证可用性


写在前面 

window系统中必须安装有Visual Studio ,后面源码安装时需要进行C++编译

下载源码并解压

GitHub - vnpy/vnpy_ctp: VeighNa框架的CTP交易接口

下载zip压缩包

解压

要在python中能执行,要有.pyd文件,解压后的文件夹内没有.pyd文件

创建python项目

新建一个python项目,项目在一个新的虚拟环境中执行。

环境

操作系统:window10 64位

开发工具:pycharm

python版本:python3.8.10

过程

编译vnpy_ctp源码

打开项目下方的terminal面板

cd 到解压后setup.py 所在文件夹  

执行 python setup.py build 

大概2到3分钟,编译完毕,在setup.py所在文件夹下多出一个build文件夹

在build下找到与操作系统和python版本对应的文件夹,以本文为例,操作系统是64位,那文件夹名称中就会有amd64,python版本3.8,文件夹名称中就会包含3.8,所以本文的文件夹名为lib.win-amd64-3.8

我们需要的是 build>lib.win-amd64-3.8>vnpy_ctp文件夹下的api文件夹,我们把api文件夹复制到项目目录下

验证可用性

创建Md调用的类和Td调用的类

import datetime,sys,os,time,pytz
from api import (
    TdApi,
    MdApi
)
class CtpMdApi(MdApi):
    def __init__(self)->None:
        super().__init__()

        self.reqid:int = 0
        self.connect_status: bool = False
        self.login_status: bool = False
        self.subscribed: set = set()

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""

        self.current_date: str = datetime.date.today().strftime('%Y%m%d')
        pass
    def connect(self, address: str, userid: str, password: str, brokerid: str)->None:
        self.userid = userid
        self.password = password
        self.brokerid = brokerid

        if not self.connect_status:
            con_body_path = CTP_MD_CON_DIR
            con_body_path = con_body_path.replace('/','\\')
            self.createFtdcMdApi(con_body_path)
            self.registerFront(address)
            self.init()

            self.connect_status = True
            pass
        pass

    def login(self)->None:
        ctp_req: dict = {
            'UserID':self.userid,
            'Password':self.password,
            'BrokerID':self.brokerid
        }
        self.reqid += 1
        self.reqUserLogin(ctp_req,self.reqid)
        pass
    def subscribe(self,req:dict):
        if self.login_status:
            self.subscribeMarketData(req['symbol'])
        self.subscribed.add(req['symbol'])
        pass
    def close(self)->None:
        if self.connect_status:
            self.exit()
        pass
    def update_date(self)->None:
        self.current_date = datetime.date.today().strftime('%Y%m%d')
        pass
    def onFrontConnected(self)->None:
        self.login()
        pass
    def onFrontDisconnected(self,reason:int)->None:
        self.login_status = False
        pass
    def onRspUserLogin(self,data:dict,error:dict,reqid:int,last:bool)->None:
        if not error['ErrorID']:
            self.login_status = True
            for symbol in self.subscribed:
                self.subscribeMarketData(symbol)
            pass
        else:
            print(f"行情服务器登录失败。{error['ErrorID']}.{error['ErrorMsg']}")
        pass
    def onRspError(self, error: dict, reqid: int, last: bool)->None:
        print('行情接口报错。', error['ErrorID'], error['ErrorMsg'])
        pass
    def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
        if not error or not error['ErrorID']:
            return
        print('行情订阅失败。',error['ErrorID'],error['ErrorMsg'])
        pass
    def onRtnDepthMarketData(self,data:dict)->None:
        if not data['UpdateTime']:
            return
        print('tick返回',data['InstrumentID'],data['LastPrice'])
        pass
    pass

class CtpTdApi(TdApi):
    def __init__(self)->None:
        super().__init__()
        self.reqid: int = 0
        self.order_ref: int = 0

        self.connect_status: bool = False
        self.login_status: bool = False
        self.auth_status: bool = False
        self.login_failed: bool = False
        self.auth_failed: bool = False
        self.contract_inited: bool = False

        self.userid: str = ""
        self.password: str = ""
        self.brokerid: str = ""
        self.auth_code: str = ""
        self.appid: str = ""

        self.frontid: int = 0
        self.sessionid: int = 0
        pass
    def connect(self,address:str,userid:str,password:str,brokerid:str,auth_code:str,appid:str)->None:
        self.userid = userid
        self.password = password
        self.brokerid = brokerid
        self.auth_code = auth_code
        self.appid = appid

        if not self.connect_status:
            con_body_path = CTP_TD_CON_DIR + self.userid + os.path.sep
            if not os.path.exists(con_body_path):
                os.mkdir(con_body_path)
            con_body_path = con_body_path.replace('/','\\')
            self.createFtdcTraderApi(con_body_path)

            self.subscribePrivateTopic(0)
            self.subscribePublicTopic(0)
            self.registerFront(address)
            self.init()

            self.connect_status = True
            pass
        else:
            self.authenticate()
        pass

    def authenticate(self)->None:
        if self.auth_failed:
            return
        ctp_req: dict = {
            "UserID": self.userid,
            "BrokerID": self.brokerid,
            "AuthCode": self.auth_code,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqAuthenticate(ctp_req, self.reqid)
        pass
    def login(self)->None:
        if self.login_failed:
            return
        ctp_req: dict = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid,
            "AppID": self.appid
        }

        self.reqid += 1
        self.reqUserLogin(ctp_req, self.reqid)
        pass
    def close(self)->None:
        if self.connect_status:
            self.exit()
        pass
    def onFrontConnected(self)->None:
        if self.auth_code:
            self.authenticate()
        else:
            self.login()
        pass
    def onFrontDisconnected(self,reason:int)->None:
        self.login_status = False
        pass
    def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool)->None:
        if not error['ErrorID']:
            self.auth_status = True
            self.login()
        else:
            self.auth_failed = True
            print('交易服务器验证失败。',error['ErrorID'],error['ErrorMsg'])
        pass
    def onRspUserLogin(self,data: dict, error: dict, reqid: int, last: bool)->None:
        if not error["ErrorID"]:
            self.frontid = data["FrontID"]
            self.sessionid = data["SessionID"]
            self.login_status = True

            # 自动确认结算单
            ctp_req: dict = {
                "BrokerID": self.brokerid,
                "InvestorID": self.userid
            }
            self.reqid += 1
            self.reqSettlementInfoConfirm(ctp_req, self.reqid)
        else:
            self.login_failed = True
            print("交易服务器登录失败", error['ErrorID'], error['ErrorMsg'])
        pass
    def onRspSettlementInfoConfirm(self,data: dict, error: dict, reqid: int, last: bool)->None:
        while True:
            self.reqid += 1
            n: int = self.reqQryInstrument({}, self.reqid)
            if not n:
                break
            else:
                time.sleep(1)
        pass
    def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
        print(data['ProductClass'],data['InstrumentID'],data['ProductID'],reqid,last)
        if last:
            self.contract_inited = True
            print('合约信息查询完毕')
        pass
    pass

执行代码

if __name__ == '__main__':
    investorid = ""
    brokerid=""
    password= ""
    appid= ""
    auth_code= "0000000000000000"
    md_ip= "180.168.146.187:10211"
    trader_ip= "180.168.146.187:10201"

    temp_api = CtpTdApi()
    address = f"tcp://{trader_ip}"
    temp_api.connect(address,investorid,password,brokerid,auth_code,appid)

    import keyboard
    keyboard.wait('esc')
    sys.exit()
    pass

能登录td服务器,并正常查询合约,可用