量化交易之高频交易源码分享python源码

量化交易之高频交易源码分享python源码

之前看到一篇介绍高频交易的博文,就想想着用python实现一下。于是把博文

量化交易之高频交易源码分享python源码

把上面内容复制到豆包,让它根据上面内容编写一个运行于掘金量化平台的策略,可以使用talib和akshare。

然后把豆包生成的代码经过数轮优化和亲自调试,得到如下代码:

# -*- coding: utf-8 -*-
import numpy as np
import talib
from gm.api import *  # 使用掘金量化标准API

# === 策略参数配置 ===
def init_params(context):
    context.params = {
        # VWAP相关
        'vwap_window': 60 * 60,  # 使用多少秒计算VWAP(如60分钟=3600秒)
        'vwap_interval': 60,  # 每隔多少秒刷新一次VWAP计算

        # 乖离率交易参数
        'divergence_upper_threshold': 1.2,
        'divergence_lower_threshold': -1.2,

        # 四维共振模型参数
        'volume_multiplier': 1.3,
        'rsi_buy_threshold': 30,
        'macd_hist_trend_days': 2,

        # 动态止盈参数
        'atr_period': 30,
        'atr_take_profit_multiplier': 1.5,

        # 梯度止损参数
        'first_stop_loss_ratio': 0.997,
        'final_stop_loss_ratio': 0.988,

        # 仓位管理
        'max_position_ratio': 0.95,
        'max_holdings': 3,
    }

    # 交易标的和周期设置
    context.symbol = 'SHSE.600000'  # 示例股票,格式为交易所.证券代码
    context.frequency_seconds = '600s'  # K线周期:5分钟 = 300 秒
    context.bar_count = 300  # 获取历史K线条数

# === 计算当日 VWAP ===
def calculate_daily_vwap(context, bar):
    now = bar['eob']
    today = now.strftime('%Y-%m-%d')

    # 获取过去60分钟的1分钟K线 => 总共60条
    kline_1min = history_n(symbol=context.symbol, frequency='60s', count=int(context.params['vwap_window'] / 60),  # 从秒转为分钟级数量
        end_time=today, fields=['close', 'volume'],  df=True)

    if len(kline_1min) == 0:
        return None

    vwap = (kline_1min['close'] * kline_1min['volume']).cumsum() / kline_1min['volume'].cumsum()
    return vwap.iloc[-1]


# === 判断乖离率交易逻辑 ===
def check_divergence(context, bar, vwap):
    current_price = bar['close']
    divergence = (current_price - vwap) / vwap * 100

    pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
    buy_thres = context.params['divergence_lower_threshold']
    sell_thres = context.params['divergence_upper_threshold']

    signals = []
    if divergence > sell_thres and pos and pos.volume > 0:
        signals.append(('sell', 'divergence'))
    elif divergence < buy_thres:
        signals.append(('buy', 'divergence'))

    return signals


# === 量价时空四维共振模型判断逻辑 ===
def check_four_dimension_resonance(context, bar, kline_5min):
    volume = bar['volume']
    recent_vol = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='volume',
                           df=True)
    avg_volume_20 = recent_vol['volume'].mean()

    if volume > avg_volume_20 * context.params['volume_multiplier']:
        close = kline_5min['close']
        macd, signal, hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
        rsi = talib.RSI(close, timeperiod=14).values
        if len(hist) >= context.params['macd_hist_trend_days']:
            recent_hist = hist[-context.params['macd_hist_trend_days']:].values
            if all(recent_hist[i] < recent_hist[i + 1] for i in range(len(recent_hist) - 1)) and \
                    rsi[-1] < context.params['rsi_buy_threshold']:
                return [('buy', 'four_dim')]
    return []


# === 动态止盈模型逻辑 ===
def check_dynamic_take_profit(context, bar, kline_5min):
    high = kline_5min['high']
    low = kline_5min['low']
    close = kline_5min['close']
    atr = talib.ATR(high, low, close, timeperiod=context.params['atr_period']).values
    if len(atr) < 2:
        return []
    current_atr = atr[-1]
    prev_atr = atr[-2]

    if current_atr > prev_atr * context.params['atr_take_profit_multiplier']:
        pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
        if pos and pos.volume > 0:
            return [('sell', 'take_profit')]
    return []


# === 梯度止损策略逻辑 ===
def check_gradient_stop_loss(context, bar):
    pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
    if not pos or pos.volume <= 0:
        return []

    cost_price = pos.vwap
    current_price = bar['close']

    signals = []

    # 第一止损位
    if current_price < cost_price * context.params['first_stop_loss_ratio']:
        signals.append(('sell', 'stop_loss'))

    # 支撑位止损
    lows = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='low', df=True)['low']
    support_level = np.min(lows)
    if current_price < support_level:
        signals.append(('sell', 'support_stop_loss'))

    # 终极止损位
    if current_price < cost_price * context.params['final_stop_loss_ratio']:
        signals.append(('sell', 'final_stop_loss'))

    return signals


# === 统一决策层 ===
def trade_engine(context, bar, vwap, kline_5min):
    signals = []

    signals.extend(check_divergence(context, bar, vwap))
    signals.extend(check_four_dimension_resonance(context, bar, kline_5min))
    signals.extend(check_dynamic_take_profit(context, bar, kline_5min))
    signals.extend(check_gradient_stop_loss(context, bar))
    priority_order = ['stop_loss', 'final_stop_loss', 'support_stop_loss', 'take_profit', 'four_dim', 'divergence']
    for action, signal_type in sorted(signals, key=lambda x: priority_order.index(x[1])):
        if action == 'buy':
            if len(context.account().positions()) >= context.params['max_holdings']:
                print("达到最大持仓数量限制,无法买入")
                continue

            cash = context.account().cash.available
            price = bar['close']
            max_shares = int(cash / price)
            if max_shares <= 0:
                print("资金不足,无法买入")
                continue

            target_value = cash * context.params['max_position_ratio']
            volume = int(target_value / price)
            if volume <= 0:
                print("目标成交量小于等于零,无法下单")
                continue

            order_target_percent(symbol=context.symbol, percent=context.params['max_position_ratio'],
                                 order_type=OrderType_Limit, price=price, position_side=1)
            break
        elif action == 'sell':
            pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
            vol = pos.volume - pos.volume_today
            if pos and vol > 0:
                order_target_volume(symbol=context.symbol, volume=pos.volume_today, order_type=OrderType_Limit, price=bar['close'], position_side=2)
            break


# === 策略初始化函数 ===
def init(context):
    init_params(context)
    subscribe(symbols=context.symbol, frequency=context.frequency_seconds, count=10)

# === 每根K线回调函数 ===
def on_bar(context, bars):
    # 获取当前时间
    if not is_trading_time(context.now):
        return
    bar = bars[0]
    current_time = bar['eob']

    try:
        # 获取5分钟K线数据
        kline_5min = history_n(symbol=context.symbol, frequency=context.frequency_seconds, count=context.bar_count,
                                end_time=current_time, fields="open, high, low, close, volume", df=True)
        # 计算VWAP
        vwap = calculate_daily_vwap(context, bar)
        if vwap is None:
            print(f"{current_time}: 无法获取VWAP数据,跳过本次处理")
            return

        # 执行统一决策层
        trade_engine(context, bar, vwap, kline_5min)

    except Exception as e:
        print(f"数据获取或计算出错: {e}")

def on_backtest_finished(context, indicator):
    print('累计收益率:', str(indicator.pnl_ratio * 100), ',年化收益率:', str(indicator.pnl_ratio_annual * 100), ',最大回撤:',
          indicator.max_drawdown * 100, ',夏普比率:', str(indicator.sharp_ratio), ',胜率:', indicator.win_ratio * 100,
        ',开仓次数:', indicator.open_count, ',平仓次数:', indicator.close_count)

if __name__ == '__main__':
    run(strategy_id='6c3dfc86-03fb-11ed-ba2d-c85b7639cf56',
        filename='main_test.py',
        mode=MODE_BACKTEST,
        token='2a73fd387f79b2c096806a91a6b13f915f981007',
        backtest_start_time='2024-04-03 09:30:00',
        backtest_end_time='2025-04-30 15:00:00',
        backtest_adjust=ADJUST_PREV,
        backtest_initial_cash=10000000,
        backtest_commission_ratio=0.00025,
        backtest_slippage_ratio=0.0001)

def on_order_status(context, order):
    # 标的代码
    symbol = order['symbol']
    # 委托价格
    price = order['price']
    # 委托数量
    volume = order['volume']
    # 查看下单后的委托状态,等于3代表委托全部成交
    status = order['status']
    # 买卖方向,1为买入,2为卖出
    side = order['side']
    # 开平仓类型,1为开仓,2为平仓
    effect = order['position_effect']
    # 委托类型,1为限价委托,2为市价委托
    order_type = order['order_type']
    if status == 3:
        if effect == 1:
            if side == 1:
                side_effect = '开多仓'
            else:
                side_effect = '开空仓'
        else:
            if side == 1:
                side_effect = '平空仓'
            else:
                side_effect = '平多仓'
        order_type_word = '限价' if order_type == 1 else '市价'
        print('{}:标的:{},操作:以{}{},委托价格:{},委托数量:{}'.format(context.now, symbol, order_type_word, side_effect, price, volume))

def is_trading_time(current_time):
    """
    判断当前时间是否为交易时间
    """
    trading_sessions = [
        ('09:30:00', '11:30:00'),
        ('13:00:00', '14:56:00')
    ]
    current_time_str = current_time.strftime('%H:%M:%S')
    for start, end in trading_sessions:
        if start <= current_time_str <= end:
            return True
    return False

该代码实现了一个基于VWAP、乖离率、四维共振模型、动态止盈和梯度止损的量化交易策略。主要功能包括:

init_params:初始化策略参数。

calculate_daily_vwap:计算当日VWAP。

check_divergence:判断乖离率交易信号。


check_four_dimension_resonance:判断四维共振模型信号。

check_dynamic_take_profit:判断动态止盈信号。

check_gradient_stop_loss:判断梯度止损信号。

trade_engine:统一决策层,根据信号执行买卖操作。

init:策略初始化。

on_bar:每根K线回调函数,执行交易逻辑。

on_backtest_finished:回测结束后的统计信息输出。

on_order_status:订单状态回调函数。

is_trading_time:判断当前是否为交易时间。

经过测试,数据订阅频率600s和3600s的收益相当,也就是说高频率并没有提升收益率。

此策略还只是单只股票的,虽然胜率很高,但是收益是根据股票的不同而变化,之后可以考虑通过选股程序选取优质股票,再加上判断股票是否强上升趋势。强上升趋势的票再执行此策略。估计收益还能进一步提高。

创作不易,对量有兴趣的朋友欢迎点赞,关注,转发,留言。我会不定期的分享自己编写的策略源码.

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/1227575
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2025 年 4 月 25 日
下一篇 7小时前

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注