backtrader通用策略模板与”积木式“策略开发

1000天行动计划之第101天,开启第2个100天计划进程!

我们回到主线——backtrader。

AI量化框架的主体是backtrader的事件驱动回测以及实盘能力。外加wxPython的GUI,支持csv、hdf5,Arctic(mongo),qlib的数据存储能力——这里也会把qlib框架整合进来,但不会用它的全流程——我把它的“因子表达式”能力独立拆分出来,不依赖它自有的数据格式,可以兼容csv,hdf5,mongo等。

01 backtrader策略

为了与“向量化回测”整合:qlib的AI导向的框架与backtrader的事件驱动与实盘做对比,使用backtrader来走一下“双均线”的策略流程。

  • 初始化大脑(默认是1万,可以通过broker的setcash把初始资金改为10万)

  • 取初始投资组合总市值

  • 运行大脑

  • 取投资组合总市值

cerebro = bt.Cerebro(maxcpus=1) # notebook里运作,maxcpus=1很重要
cerebro.broker.setcash(100000.0) #

print('初始总市值: %.2f' % cerebro.broker.getvalue())


cerebro.run()

print('期末总市值: %.2f' % cerebro.broker.getvalue())

添加数据源,使用pandasdata

  • 使用HDFStore从hdf5存储中读出数据

  • 初始化为bt的格式,只需要多一列openinterest
  • 使用bt.feeds.PandasData封装后,添加给大脑
import pandas as pd
from datetime import datetime

symbol = 'SPX'
start = datetime(2010,1,1)
end =  datetime(2022,9,23)

with pd.HDFStore('../../data/hdf5/index.h5') as store:
    df = store[symbol]

df['openinterest'] = 0
df = df[['open', 'high', 'low', 'close', 'volume', 'openinterest']]

data = bt.feeds.PandasData(dataname=df, name=symbol, fromdate=start, todate=end)
cerebro.adddata(data)

双均线策略:

class SMAStrategy(bt.Strategy):

    def __init__(self):
        self.sma_short = bt.indicators.SMA(period=42)
        self.sma_long = bt.indicators.SMA(period=252)
        #self.sma = btind.SimpleMovingAverage(period=15)

    def next(self):
        #print(self.data0[0], self.sma_short[0], self.sma_long[0])
        if self.sma_short[0] > self.sma_long[0]:
            self.buy()
        else:
            self.close()

图片

02 backtrader通用模板

# encoding:utf8
from datetime import datetime
import backtrader as bt
import pandas as pd
from loguru import logger


class BacktraderEngine:
    def __init__(self, init_cash=1000000.0, benchmark='000300.SH', start=datetime(2010, 1, 1),
                 end=datetime.now().date()):
        self.init_cash = init_cash
        self.start = start
        self.end = end
        self.benchmark = benchmark

        cerebro = bt.Cerebro()
        cerebro.broker.setcash(init_cash)

        # 设置手续费
        cerebro.broker.setcommission(0.0001)
        # 滑点:双边各 0.0001
        cerebro.broker.set_slippage_perc(perc=0.0001)

        self.cerebro = cerebro
        self.cerebro.addanalyzer(bt.analyzers.PyFolio, _name='_PyFolio')

添加数据:

def add_data(self, code):
    # 加载数据
    df = self.feed.get_df(code)
    df = to_backtrader_dataframe(df)
    data = bt.feeds.PandasData(dataname=df, name=code, fromdate=self.start, todate=self.end)

    self.cerebro.adddata(data)
    self.cerebro.addobserver(bt.observers.Benchmark,
                             data=data)
    self.cerebro.addobserver(bt.observers.TimeReturn)

策略模板:

策略里,notify_order, notify_trade两个函数,一个是订阅提交时触发,一个是交易产生时触发,这个在策略调试时特别有用,而且是通用的,直接写在基类里,另外就是基数实现log函数

# encoding: utf8
import backtrader as bt
from loguru import logger
import pandas as pd


class StrategyBase(bt.Strategy):
    def log(self, txt, dt=None):
        dt = dt or self.datas[0].datetime.date(0)
        logger.info('%s, %s' % (dt.isoformat(), txt))

    def get_current_dt(self):
        return self.datas[0].datetime.date(0)

    # 打印订单日志
    def notify_order(self, order):

        order_status = ['Created', 'Submitted', 'Accepted', 'Partial',
                        'Completed', 'Canceled', 'Expired', 'Margin', 'Rejected']
        # 未被处理的订单
        if order.status in [order.Submitted, order.Accepted]:
            self.log('未处理订单:订单号:%.0f, 标的: %s, 状态状态: %s' % (order.ref,
                                                        order.data._name,
                                                        order_status[order.status]))
            
        # 已经处理的订单
        if order.status in [order.Partial, order.Completed]:
            if order.isbuy():
                self.log(
                    'BUY EXECUTED, 状态: %s, 订单号:%.0f, 标的: %s, 数量: %.2f, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
                    (order_status[order.status],  # 订单状态
                     order.ref,  # 订单编号
                     order.data._name,  # 股票名称
                     order.executed.size,  # 成交量
                     order.executed.price,  # 成交价
                     order.executed.value,  # 成交额
                     order.executed.comm))  # 佣金
            else:  # Sell
                self.log(
                    'SELL EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' %
                    (order_status[order.status],
                     order.ref,
                     order.data._name,
                     order.executed.size,
                     order.executed.price,
                     order.executed.value,
                     order.executed.comm))

        elif order.status in [order.Canceled, order.Margin, order.Rejected, order.Expired]:
            # order.Margin资金不足,订单无法成交
            # 订单未完成
            self.log('未完成订单,订单号:%.0f, 标的 : %s, 订单状态: %s' % (
                order.ref, order.data._name, order_status[order.status]))

        self.order = None

    def notify_trade(self, trade):
        logger.debug('trade......',trade.status)
        # 交易刚打开时
        if trade.justopened:
            self.log('开仓, 标的: %s, 股数: %.2f,价格: %.2f' % (
                trade.getdataname(), trade.size, trade.price))
        # 交易结束
        elif trade.isclosed:
            self.log('平仓, 标的: %s, GROSS %.2f, NET %.2f, 手续费 %.2f' % (
                trade.getdataname(), trade.pnl, trade.pnlcomm, trade.commission))
        # 更新交易状态
        else:
            self.log('交易更新, 标的: %s, 仓位: %.2f,价格: %.2f' % (
                trade.getdataname(), trade.size, trade.price))

这样我们在写一个新策略时,直接继承使用,代码就非常简洁:

class StrategyPickTime(StrategyBase):
    params = (
        ('long', 252),
        ('short', 42),
    )

    def __init__(self):
        # bt.ind.EMA(self.data, period=self.p.ema_period)
        self.long = bt.ind.SMA(period=self.p.long)
        self.short = bt.ind.SMA(period=self.p.short)

    def next(self):
        if self.short[0] > self.long[0]:
            self.buy()
        else:
            self.close()

运行时,可以打印出所有的日志:

图片

03 “模块化算子”

这个才是“积木式”策略开发的精华。

我们连上面的代码都不想写,想直接从“仓库”里复用。

我们把策略逻辑拆解成:选标的、按信号筛选,排序,仓位分配等等。

先从最简单的“等权重买入并持有,每个季度动态再平衡”,这个经典的资本配置策略开始。

这里拆分成“算子”会是:

1、每个季度运行一次(含首次)

2、选标的(选择所有)

3、仓位分配(等权)

我们在策略模板的基础上,再继承实现”算子策略模板“。

传入一个算子列表,在策略next函数里,逐个执行每个algo,若遇到某个algo返回True,则本次运行结果。

class StratgeyAlgoBase(StrategyBase):
    def __init__(self, algo_list=None):
        self.algos = algo_list

    def next(self):
        context = {
            'strategy': self
        }

        for algo in self.algos:
            if algo(context) is True:  # 如果algo返回True,直接不运行,本次不调仓
                return

有了这个模板,结合前面说的三个步骤:

我们不需要写代码就可以完成这个策略!

e.cerebro.addstrategy(StratgeyAlgoBase, algo_list=[
    RunQuarterly(),
    SelectAll(),
    WeightEqually
])

图片

我们只需要持续封装算子库,可以最大化减少开发策略所需要的时间,把时间多多用在策略的思想上吧。

后续我们再来介绍每个“算子”如何设计。

 

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/104209
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2024 年 7 月 29 日
下一篇 2024 年 7 月 29 日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注