
之前看到一篇介绍高频交易的博文,就想想着用python实现一下。于是把博文

把上面内容复制到豆包,让它根据上面内容编写一个运行于掘金量化平台的策略,可以使用talib和akshare。
然后把豆包生成的代码经过数轮优化和亲自调试,得到如下代码:
# -*- coding: utf-8 -*-
import numpy as np
import talib
from gm.api import * # 使用掘金量化标准API
# === 策略参数配置 ===
def init_params(context):
context.params = {
# VWAP相关
'vwap_window': 60 * 60, # 使用多少秒计算VWAP(如60分钟=3600秒)
'vwap_interval': 60, # 每隔多少秒刷新一次VWAP计算
# 乖离率交易参数
'divergence_upper_threshold': 1.2,
'divergence_lower_threshold': -1.2,
# 四维共振模型参数
'volume_multiplier': 1.3,
'rsi_buy_threshold': 30,
'macd_hist_trend_days': 2,
# 动态止盈参数
'atr_period': 30,
'atr_take_profit_multiplier': 1.5,
# 梯度止损参数
'first_stop_loss_ratio': 0.997,
'final_stop_loss_ratio': 0.988,
# 仓位管理
'max_position_ratio': 0.95,
'max_holdings': 3,
}
# 交易标的和周期设置
context.symbol = 'SHSE.600000' # 示例股票,格式为交易所.证券代码
context.frequency_seconds = '600s' # K线周期:5分钟 = 300 秒
context.bar_count = 300 # 获取历史K线条数
# === 计算当日 VWAP ===
def calculate_daily_vwap(context, bar):
now = bar['eob']
today = now.strftime('%Y-%m-%d')
# 获取过去60分钟的1分钟K线 => 总共60条
kline_1min = history_n(symbol=context.symbol, frequency='60s', count=int(context.params['vwap_window'] / 60), # 从秒转为分钟级数量
end_time=today, fields=['close', 'volume'], df=True)
if len(kline_1min) == 0:
return None
vwap = (kline_1min['close'] * kline_1min['volume']).cumsum() / kline_1min['volume'].cumsum()
return vwap.iloc[-1]
# === 判断乖离率交易逻辑 ===
def check_divergence(context, bar, vwap):
current_price = bar['close']
divergence = (current_price - vwap) / vwap * 100
pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
buy_thres = context.params['divergence_lower_threshold']
sell_thres = context.params['divergence_upper_threshold']
signals = []
if divergence > sell_thres and pos and pos.volume > 0:
signals.append(('sell', 'divergence'))
elif divergence < buy_thres:
signals.append(('buy', 'divergence'))
return signals
# === 量价时空四维共振模型判断逻辑 ===
def check_four_dimension_resonance(context, bar, kline_5min):
volume = bar['volume']
recent_vol = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='volume',
df=True)
avg_volume_20 = recent_vol['volume'].mean()
if volume > avg_volume_20 * context.params['volume_multiplier']:
close = kline_5min['close']
macd, signal, hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
rsi = talib.RSI(close, timeperiod=14).values
if len(hist) >= context.params['macd_hist_trend_days']:
recent_hist = hist[-context.params['macd_hist_trend_days']:].values
if all(recent_hist[i] < recent_hist[i + 1] for i in range(len(recent_hist) - 1)) and \
rsi[-1] < context.params['rsi_buy_threshold']:
return [('buy', 'four_dim')]
return []
# === 动态止盈模型逻辑 ===
def check_dynamic_take_profit(context, bar, kline_5min):
high = kline_5min['high']
low = kline_5min['low']
close = kline_5min['close']
atr = talib.ATR(high, low, close, timeperiod=context.params['atr_period']).values
if len(atr) < 2:
return []
current_atr = atr[-1]
prev_atr = atr[-2]
if current_atr > prev_atr * context.params['atr_take_profit_multiplier']:
pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
if pos and pos.volume > 0:
return [('sell', 'take_profit')]
return []
# === 梯度止损策略逻辑 ===
def check_gradient_stop_loss(context, bar):
pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
if not pos or pos.volume <= 0:
return []
cost_price = pos.vwap
current_price = bar['close']
signals = []
# 第一止损位
if current_price < cost_price * context.params['first_stop_loss_ratio']:
signals.append(('sell', 'stop_loss'))
# 支撑位止损
lows = history_n(symbol=context.symbol, frequency='60s', count=20, end_time=context.now, fields='low', df=True)['low']
support_level = np.min(lows)
if current_price < support_level:
signals.append(('sell', 'support_stop_loss'))
# 终极止损位
if current_price < cost_price * context.params['final_stop_loss_ratio']:
signals.append(('sell', 'final_stop_loss'))
return signals
# === 统一决策层 ===
def trade_engine(context, bar, vwap, kline_5min):
signals = []
signals.extend(check_divergence(context, bar, vwap))
signals.extend(check_four_dimension_resonance(context, bar, kline_5min))
signals.extend(check_dynamic_take_profit(context, bar, kline_5min))
signals.extend(check_gradient_stop_loss(context, bar))
priority_order = ['stop_loss', 'final_stop_loss', 'support_stop_loss', 'take_profit', 'four_dim', 'divergence']
for action, signal_type in sorted(signals, key=lambda x: priority_order.index(x[1])):
if action == 'buy':
if len(context.account().positions()) >= context.params['max_holdings']:
print("达到最大持仓数量限制,无法买入")
continue
cash = context.account().cash.available
price = bar['close']
max_shares = int(cash / price)
if max_shares <= 0:
print("资金不足,无法买入")
continue
target_value = cash * context.params['max_position_ratio']
volume = int(target_value / price)
if volume <= 0:
print("目标成交量小于等于零,无法下单")
continue
order_target_percent(symbol=context.symbol, percent=context.params['max_position_ratio'],
order_type=OrderType_Limit, price=price, position_side=1)
break
elif action == 'sell':
pos = context.account().position(symbol=context.symbol, side=PositionSide_Long)
vol = pos.volume - pos.volume_today
if pos and vol > 0:
order_target_volume(symbol=context.symbol, volume=pos.volume_today, order_type=OrderType_Limit, price=bar['close'], position_side=2)
break
# === 策略初始化函数 ===
def init(context):
init_params(context)
subscribe(symbols=context.symbol, frequency=context.frequency_seconds, count=10)
# === 每根K线回调函数 ===
def on_bar(context, bars):
# 获取当前时间
if not is_trading_time(context.now):
return
bar = bars[0]
current_time = bar['eob']
try:
# 获取5分钟K线数据
kline_5min = history_n(symbol=context.symbol, frequency=context.frequency_seconds, count=context.bar_count,
end_time=current_time, fields="open, high, low, close, volume", df=True)
# 计算VWAP
vwap = calculate_daily_vwap(context, bar)
if vwap is None:
print(f"{current_time}: 无法获取VWAP数据,跳过本次处理")
return
# 执行统一决策层
trade_engine(context, bar, vwap, kline_5min)
except Exception as e:
print(f"数据获取或计算出错: {e}")
def on_backtest_finished(context, indicator):
print('累计收益率:', str(indicator.pnl_ratio * 100), ',年化收益率:', str(indicator.pnl_ratio_annual * 100), ',最大回撤:',
indicator.max_drawdown * 100, ',夏普比率:', str(indicator.sharp_ratio), ',胜率:', indicator.win_ratio * 100,
',开仓次数:', indicator.open_count, ',平仓次数:', indicator.close_count)
if __name__ == '__main__':
run(strategy_id='6c3dfc86-03fb-11ed-ba2d-c85b7639cf56',
filename='main_test.py',
mode=MODE_BACKTEST,
token='2a73fd387f79b2c096806a91a6b13f915f981007',
backtest_start_time='2024-04-03 09:30:00',
backtest_end_time='2025-04-30 15:00:00',
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=10000000,
backtest_commission_ratio=0.00025,
backtest_slippage_ratio=0.0001)
def on_order_status(context, order):
# 标的代码
symbol = order['symbol']
# 委托价格
price = order['price']
# 委托数量
volume = order['volume']
# 查看下单后的委托状态,等于3代表委托全部成交
status = order['status']
# 买卖方向,1为买入,2为卖出
side = order['side']
# 开平仓类型,1为开仓,2为平仓
effect = order['position_effect']
# 委托类型,1为限价委托,2为市价委托
order_type = order['order_type']
if status == 3:
if effect == 1:
if side == 1:
side_effect = '开多仓'
else:
side_effect = '开空仓'
else:
if side == 1:
side_effect = '平空仓'
else:
side_effect = '平多仓'
order_type_word = '限价' if order_type == 1 else '市价'
print('{}:标的:{},操作:以{}{},委托价格:{},委托数量:{}'.format(context.now, symbol, order_type_word, side_effect, price, volume))
def is_trading_time(current_time):
"""
判断当前时间是否为交易时间
"""
trading_sessions = [
('09:30:00', '11:30:00'),
('13:00:00', '14:56:00')
]
current_time_str = current_time.strftime('%H:%M:%S')
for start, end in trading_sessions:
if start <= current_time_str <= end:
return True
return False
该代码实现了一个基于VWAP、乖离率、四维共振模型、动态止盈和梯度止损的量化交易策略。主要功能包括:
init_params:初始化策略参数。
calculate_daily_vwap:计算当日VWAP。
check_divergence:判断乖离率交易信号。
check_four_dimension_resonance:判断四维共振模型信号。
check_dynamic_take_profit:判断动态止盈信号。
check_gradient_stop_loss:判断梯度止损信号。
trade_engine:统一决策层,根据信号执行买卖操作。
init:策略初始化。
on_bar:每根K线回调函数,执行交易逻辑。
on_backtest_finished:回测结束后的统计信息输出。
on_order_status:订单状态回调函数。
is_trading_time:判断当前是否为交易时间。
经过测试,数据订阅频率600s和3600s的收益相当,也就是说高频率并没有提升收益率。
此策略还只是单只股票的,虽然胜率很高,但是收益是根据股票的不同而变化,之后可以考虑通过选股程序选取优质股票,再加上判断股票是否强上升趋势。强上升趋势的票再执行此策略。估计收益还能进一步提高。
创作不易,对量有兴趣的朋友欢迎点赞,关注,转发,留言。我会不定期的分享自己编写的策略源码.
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/1227575
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!