构建一个基于PPO(Proximal Policy Optimization)算法的股票预测系统涉及到多个步骤,包括环境设置、数据准备、特征工程、模型训练等。下面是一个详细的指南,下面给出20个可能影响股票价格的因素作为输入特征。
1、特征因子计算:
- 简单移动平均线 (SMA): 过去n天的收盘价平均值。
- 指数移动平均线 (EMA): 加权移动平均,更重视近期的数据点。
- 相对强弱指数 (RSI): 衡量股票超买或超卖的程度。
- 布林带 (Bollinger Bands): 由一条中轨和两条上下轨组成,用于衡量价格波动。
- MACD (Moving Average Convergence Divergence): 显示长期趋势中的短期动量变化。
- 成交量加权平均价格 (VWAP): 成交量加权的平均价格。
- ATR (Average True Range): 测量市场的波动性。
- 威廉姆斯%R: 另一种超买/超卖指标。
- 随机震荡指标 (Stochastic Oscillator): 比较某一时间段内的收盘价与该时间段内的价格范围。
- OBV (On Balance Volume): 结合了成交量和价格变动的趋势指标。
- 累积/派发线 (Accumulation/Distribution Line): 结合价格和成交量来评估资金流向。
- Chaikin Money Flow (CMF): 通过成交量加权的价格压力来衡量资金流入或流出的程度。
- ADX (Average Directional Index): 衡量趋势强度而不考虑方向。
- MFI (Money Flow Index): 类似于RSI但结合了成交量。
- CCI (Commodity Channel Index): 用于识别周期性的高点和低点。
- Donchian通道: 基于最高价和最低价的移动窗口。
- Parabolic SAR: 跟踪趋势的反转点。
- Price Rate of Change (ROC): 价格变化率。
- Ultimate Oscillator: 结合短、中、长周期的振荡器。
- Daily Return: 日收益率。
具体代码如下:
import pandas as pd
import numpy as np
def calculate_factors(df):
df['SMA'] = df['Close'].rolling(window=14).mean()
df['EMA'] = df['Close'].ewm(span=14, adjust=False).mean()
df['RSI'] = compute_RSI(df['Close'], window=14)
upper_band, lower_band = compute_bollinger_bands(df['Close'])
df['Upper_Band'] = upper_band
df['Lower_Band'] = lower_band
df['MACD'], df['Signal'] = compute_MACD(df['Close'])
df['VWAP'] = compute_VWAP(df)
df['ATR'] = compute_ATR(df, window=14)
df['Williams_%R'] = compute_Williams_R(df['High'], df['Low'], df['Close'])
df['Stochastic_Oscillator'] = compute_stochastic_oscillator(df['Close'], df['High'], df['Low'])
df['OBV'] = compute_OBV(df['Close'], df['Volume'])
df['A/D_Line'] = compute_accumulation_distribution_line(df['High'], df['Low'], df['Close'], df['Volume'])
df['CMF'] = compute_chaikin_money_flow(df['High'], df['Low'], df['Close'], df['Volume'])
df['ADX'] = compute_ADX(df['High'], df['Low'], df['Close'])
df['MFI'] = compute_MFI(df['High'], df['Low'], df['Close'], df['Volume'])
df['CCI'] = compute_CCI(df['High'], df['Low'], df['Close'])
df['Donchian_High'], df['Donchian_Low'] = compute_donchian_channels(df['High'], df['Low'])
df['Parabolic_SAR'] = compute_parabolic_SAR(df['High'], df['Low'], df['Close'])
df['ROC'] = compute_rate_of_change(df['Close'])
df['Ultimate_Oscillator'] = compute_ultimate_oscillator(df['Close'], df['High'], df['Low'])
df['Daily_Return'] = df['Close'].pct_change()
return df[['SMA', 'EMA', 'RSI', 'Upper_Band', 'Lower_Band', 'MACD', 'Signal', 'VWAP', 'ATR',
'Williams_%R', 'Stochastic_Oscillator', 'OBV', 'A/D_Line', 'CMF', 'ADX', 'MFI',
'CCI', 'Donchian_High', 'Donchian_Low', 'Parabolic_SAR', 'ROC', 'Ultimate_Oscillator', 'Daily_Return']].fillna(method='ffill')
# 各种指标的具体实现函数需要根据上述定义自行编写
强化学习代码
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Categorical
import pandas as pd
# 环境定义
class StockTradingEnv:
def __init__(self, data, initial_balance=10000):
self.data = data # 包含20个因子的时间序列数据
self.current_step = 0
self.initial_balance = initial_balance
self.balance = initial_balance
self.position = 0 # 持仓数量
self.max_steps = len(data) - 1
def reset(self):
self.current_step = 0
self.balance = self.initial_balance
self.position = 0
return self._get_state()
def _get_state(self):
# 获取当前状态:账户信息 + 20个因子
state = [
self.balance / self.initial_balance,
self.position / 1000 # 假设最大持仓1000股
]
state.extend(self.data.iloc[self.current_step].values)
return np.array(state)
def step(self, action):
# 0: 卖出, 1: 持有, 2: 买入
current_price = self.data.iloc[self.current_step]['close'] # 假设close是其中一个因子
if action == 0 and self.position > 0: # 卖出
self.balance += self.position * current_price
self.position = 0
elif action == 2 and self.balance > 0: # 买入
self.position = self.balance // current_price
self.balance -= self.position * current_price
# 计算收益
portfolio_value = self.balance + self.position * current_price
reward = portfolio_value - self.initial_balance
self.current_step += 1
done = self.current_step >= self.max_steps
return self._get_state(), reward, done, {}
# PPO模型定义
class ActorCritic(nn.Module):
def __init__(self, input_dim, output_dim):
super(ActorCritic, self).__init__()
# 共享网络层
self.shared = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU()
)
# Actor网络
self.actor = nn.Sequential(
nn.Linear(128, output_dim),
nn.Softmax(dim=-1)
# Critic网络
self.critic = nn.Linear(128, 1)
def forward(self, x):
x = self.shared(x)
action_probs = self.actor(x)
state_value = self.critic(x)
return action_probs, state_value
# PPO算法实现
class PPO:
def __init__(self, input_dim, output_dim, lr=3e-4, gamma=0.99, clip_epsilon=0.2):
self.policy = ActorCritic(input_dim, output_dim)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = gamma
self.clip_epsilon = clip_epsilon
def update(self, states, actions, old_probs, rewards, dones):
# 转换数据为Tensor
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
old_probs = torch.FloatTensor(old_probs)
# 计算折扣回报
discounted_rewards = []
running_reward = 0
for r in reversed(rewards):
running_reward = r + self.gamma * running_reward
discounted_rewards.insert(0, running_reward)
# 标准化回报
discounted_rewards = torch.FloatTensor(discounted_rewards)
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)
# 获取新策略的概率和状态价值
new_probs, state_values = self.policy(states)
new_probs = new_probs.gather(1, actions.unsqueeze(1)).squeeze()
# 计算优势函数
advantages = discounted_rewards - state_values.squeeze().detach()
# 计算策略损失(带clip)
ratio = new_probs / old_probs
clipped_ratio = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon)
policy_loss = -torch.min(ratio * advantages, clipped_ratio * advantages).mean()
# 价值函数损失
value_loss = nn.MSELoss()(state_values.squeeze(), discounted_rewards)
# 总损失
total_loss = policy_loss + 0.5 * value_loss
# 优化步骤
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
# 训练流程
def main():
# 参数设置
num_episodes = 1000
input_dim = 22 # 20个因子 + 2个账户状态
output_dim = 3 # 三个动作
# 加载数据(示例数据,需替换为真实数据)
data = pd.read_csv('stock_data.csv') # 应包含20个因子列
# 初始化环境和算法
env = StockTradingEnv(data)
ppo = PPO(input_dim, output_dim)
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
states = []
actions = []
old_probs = []
rewards = []
while True:
# 获取动作概率
with torch.no_grad():
state_tensor = torch.FloatTensor(state)
action_probs, _ = ppo.policy(state_tensor)
dist = Categorical(action_probs)
action = dist.sample().item()
old_prob = action_probs[action].item()
# 执行动作
next_state, reward, done, _ = env.step(action)
# 存储经验
states.append(state)
actions.append(action)
old_probs.append(old_prob)
rewards.append(reward)
state = next_state
episode_reward += reward
if done:
# 更新策略
ppo.update(states, actions, old_probs, rewards, [done]*len(states))
print(f"Episode {episode+1}, Total Reward: {episode_reward:.2f}")
break
if __name__ == "__main__":
main()
关注我,每天分享打爆主力和游资的高效量化方法
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/949268
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!