上次我们说到vnpy的源码安装。使用源码安装完vnpy后,我们就可以用python编写我们的量化交易程序啦。我们先从最简单的一个量化交易程序入手,一看就会!
pycharm安装
我们上pycharm官网,下载并安装python专用开发工具-pycharm。
点击download直接进行下载,如果要下载稳定版本,建议选择other versions,到里面选择稳定版本,我下载的版本为2019.2.3。
下载后,直接点击exe安装,安装后,可以在桌面上找到pycharm图标,既安装完成。

pycharm
导入vnpy项目
打开pycharm,界面选择Open,打开源码解压的vnpy-2.10.0项目,即可打开项目,重要要选择2.10.0的根目录。

导入vnpy2.10.0完成。

vnpy导入成功
了解no-ui模块
vnpy提供了no_ui的运行方式,该方式顾名思义就是“无界面”的意思,即在后台运行程序,我们部署在云服务器linux上,一般使用这种方式。因此,我们在这模块下开发。
首先,我们打开》examples》no_ui,在该目录下,我们新建一个文件夹,名称为:[.vntrader],注意有一个点。然后在该目录下创建一个algo_trading_setting.json文件,在配置文件中写入如下配置:
{
"20200315": {
"template_name": "ExampleTestMethod",
"vt_symbol": "ETH200626.HUOBI",
"interval": 20
}
}
这个是一个策略的启动项配置,关于配置的知识,我们以后再深入了解。
在no_ui下,源码已经提供了一个启动文件run.py,默认这个是连接ctp交易所的,因为ctp交易还需要向期货公司申请ctp连接,我们这边先暂时用HUOBI数字货币交易所进行测试。大家都知道这种交易所一天24小时不间断交易,也方便我们进行测试。所以,我们要将ctp连接方式改成hdbm方式,hdbm为这个交易所的期货合约行情对接接口。
修改这个run.py信息如下:
# encoding: utf-8
from datetime import time
from logging import INFO
from time import sleep
from vnpy.app.algo_trading import AlgoTradingApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.event import EventEngine
from vnpy.gateway.hbdm import HbdmGateway
from vnpy.trader.engine import MainEngine
from vnpy.trader.setting import SETTINGS
SETTINGS["log.active"] = TrueSETTINGS["log.level"] = INFO
SETTINGS["log.console"] = Truedefault_setting = {
"API Key": "huobi官网上获得你账号的ak",
"Secret Key": "huobi官网上获得你账号的sk",
"会话数": 5,
"代理地址": "",
"代理端口": ""
}
def run_child():
""" Running in the child process. """
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# 使用火币交易所 main_engine.add_gateway(HbdmGateway)
algo_engine = main_engine.add_app(AlgoTradingApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(default_setting, "HBDM")
main_engine.write_log("连接HBDM接口, 请稍等!")
sleep(30)
# 算法引擎 algo_engine.init_engine()
main_engine.write_log("算法策略初始化完成")
# 对应json文件配置的key
algo_engine.start_one_algo("测试")
while True:
sleep(1)
def run_parent():
""" Running in the parent process. """ print("启动CTA策略守护父进程")
# Chinese futures market trading period (day/night) DAY_START = time(8, 45)
DAY_END = time(15, 30)
NIGHT_START = time(20, 45)
NIGHT_END = time(2, 45)
child_process = None while True:
trading = True # Start child process in trading period if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("子进程启动成功")
# 非记录时间则退出子进程 if not trading and child_process is not None:
print("关闭子进程")
child_process.terminate()
child_process.join()
child_process = None print("子进程关闭成功")
sleep(5)
if __name__ == "__main__":
run_parent()
有需要源文件的,可以私信我。
前置工作准备好后,我们可以安心写代码啦。下面我们开始交易逻辑编写。
编写策略逻辑
我们开发一个最简单的逻辑:
以当前价格买入一手ETH0626期货合约,买入后开始监控tick价格变化。当买一价大于该买入价0.5个点时,以买一价止盈卖出平仓。当前卖一价小于买入价0.8个点时,平仓止损。平仓时,就停止交易。
新建文件夹》examples》no_ui》algos,在该目录下创建py文件,在该文件中写以上的交易逻辑,该逻辑比较简单,下面我们直接给出源代码:
# encoding: utf-8
"""
@Author: 大操手量化投资
@time: 2020-03-08 下午 15:22
@description:量化交易委托成交示例
"""
import time
from vnpy.app.algo_trading import AlgoTemplate
from vnpy.trader.constant import Offset, Direction, OrderType, Status
from vnpy.trader.engine import BaseEngine
from vnpy.trader.object import TradeData, OrderData, TickData
class ExampleTestMethod(AlgoTemplate):
strategy_name = "网格交易实战"
""""""
# 设置,到界面
default_setting = {
}
# 和参数类似,这一步操作是为了让系统内的策略引擎,得以知道该策略有哪些变量,
# 并在GUI图形界面上更新策略状态时将这些变量的最新数值显示出来,
# 同时在保存策略运行状态到缓存文件中时将这些变量写入进去(实盘中每天关闭策略时会自动缓存)。
variables = [
]
def __init__(self, algo_engine: BaseEngine, algo_name: str, setting: dict):
""""""
super().__init__(algo_engine, algo_name, setting)
# Variables
self.timer_count = 0
self.vt_order_id = ""
self.step_volume = 1
self.vt_symbol = setting["vt_symbol"]
self.interval = setting["interval"]
# 如果已经有成交了,不希望再多成交几手,可以初始赋值
self.traded = 0
self.order_status = ""
self.last_tick = None
self.new_price = 0.0
# 直接redis读取数据
# 买开的手数
self.pos_buy = 0
# 卖开的手数
self.pos_sell = 0
self.open_price = 0
# 委托标记
self.is_open = 0
# 价格成交标记
self.is_order_buy = 0
self.is_order_sell = 0
self.buy_price = 0
self.log(f"启动完毕,目标合约:EHT0626")
self.subscribe(self.vt_symbol)
self.put_parameters_event()
self.put_variables_event()
def on_tick(self, tick: TickData):
""""""
self.last_tick = tick
self.put_variables_event()
def on_timer(self):
# 每秒调用一次
if not self.last_tick:
self.log("没有数据!")
return
self.timer_count += 1
if self.timer_count < self.interval:
return
self.timer_count = 0
# 策略逻辑部分:
# 1 以当前卖一价价格买入 open_price。持续监听价格变化
# 2 当买一价大于买入价1个点时,以买一价卖出平仓。
# 3 当前价格小于买入价1.5个点时,平仓止损
# 以当前卖一价格买入
if self.is_open == 0:
self.buy(
self.vt_symbol,
self.last_tick.ask_price_1,
self.step_volume,
OrderType.LIMIT,
Offset.OPEN
)
self.log(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}【委托做多开仓一笔已提交】")
self.is_open = 1
return
if self.is_open == 1 and self.is_order_sell == 0 and self.last_tick.bid_price_1 - self.buy_price > 0.5:
self.vt_order_id = self.sell(
self.vt_symbol,
self.last_tick.bid_price_1,
self.step_volume,
OrderType.LIMIT,
Offset.CLOSE
)
self.log(
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}【委托止盈平仓一笔已提交,委托数量: {self.step_volume}】")
self.is_order_sell = 1
return
if self.is_open == 1 and self.is_order_sell == 0 and self.buy_price - self.last_tick.ask_price_1 > 0.8:
self.vt_order_id = self.sell(
self.vt_symbol,
self.last_tick.ask_price_1,
self.step_volume,
OrderType.LIMIT,
Offset.CLOSE
)
self.log(
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}【委托止损平仓一笔已提交,委托数量: {self.step_volume}】")
self.is_order_sell = 1
return
def on_order(self, order: OrderData):
if order.status == Status.ALLTRADED:
# 多开,多的手数加1,并且记录成交价格
if order.direction == Direction.LONG and order.offset == Offset.OPEN:
self.log(
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}【开多,成交价: {order.price},成交数量: {order.volume}】")
self.is_order_buy = 1
self.buy_price = order.price
# 卖平
if order.direction == Direction.SHORT and order.offset == Offset.CLOSE:
self.log(
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}【平仓,成交价: {order.price},成交数量: {order.volume}】")
self.is_order_sell = 1
self.stop()
def on_trade(self, trade: TradeData):
""""""
pass
def log(self, log: str):
self.algo_engine.main_engine.write_log(log)
# self.write_log(log)
# print(log)
核心代码逻辑在on_timer,这个是每秒执行一次的函数,如果我们希望10秒扫描一次,我们在json文件中配置参数,然后在代码逻辑累加到对应的设置时间参数即可。在这个方法里面,我们实现我们以上将的交易逻辑。
关于里面的函数和对应的方法,我们后续会仔细拆开来进一步讲解。
执行程序运行
在run.py文件中,右键debug,开始执行该自动化交易程序。

run.py文件右键执行
控制台上可以看到程序执行打印输出的日志:

委托单成交日志:
1 开仓记录

2 平仓记录

委托平仓
3 交易所成交单验证

交易所成交单验证
以上,我们可以看到程序正常执行指令,程序的成交单跟交易所给的成交单一致。程序执行成功,逻辑正确。
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/78160
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!