量化炒股史上最牛算法强化学习保姆级教程打爆主力和游资

构建一个基于PPO(Proximal Policy Optimization)算法的股票预测系统涉及到多个步骤,包括环境设置、数据准备、特征工程、模型训练等。下面是一个详细的指南,下面给出20个可能影响股票价格的因素作为输入特征。

1、特征因子计算:

  1. 简单移动平均线 (SMA): 过去n天的收盘价平均值。
  2. 指数移动平均线 (EMA): 加权移动平均,更重视近期的数据点。
  3. 相对强弱指数 (RSI): 衡量股票超买或超卖的程度。
  4. 布林带 (Bollinger Bands): 由一条中轨和两条上下轨组成,用于衡量价格波动。
  5. MACD (Moving Average Convergence Divergence): 显示长期趋势中的短期动量变化。
  6. 成交量加权平均价格 (VWAP): 成交量加权的平均价格。
  7. ATR (Average True Range): 测量市场的波动性。
  8. 威廉姆斯%R: 另一种超买/超卖指标。
  9. 随机震荡指标 (Stochastic Oscillator): 比较某一时间段内的收盘价与该时间段内的价格范围。
  10. OBV (On Balance Volume): 结合了成交量和价格变动的趋势指标。
  11. 累积/派发线 (Accumulation/Distribution Line): 结合价格和成交量来评估资金流向。
  12. Chaikin Money Flow (CMF): 通过成交量加权的价格压力来衡量资金流入或流出的程度。
  13. ADX (Average Directional Index): 衡量趋势强度而不考虑方向。
  14. MFI (Money Flow Index): 类似于RSI但结合了成交量。
  15. CCI (Commodity Channel Index): 用于识别周期性的高点和低点。
  16. Donchian通道: 基于最高价和最低价的移动窗口。
  17. Parabolic SAR: 跟踪趋势的反转点。
  18. Price Rate of Change (ROC): 价格变化率。
  19. Ultimate Oscillator: 结合短、中、长周期的振荡器。
  20. Daily Return: 日收益率。

具体代码如下:

import pandas as pd
import numpy as np

def calculate_factors(df):
    df['SMA'] = df['Close'].rolling(window=14).mean()
    df['EMA'] = df['Close'].ewm(span=14, adjust=False).mean()
    df['RSI'] = compute_RSI(df['Close'], window=14)
    upper_band, lower_band = compute_bollinger_bands(df['Close'])
    df['Upper_Band'] = upper_band
    df['Lower_Band'] = lower_band
    df['MACD'], df['Signal'] = compute_MACD(df['Close'])
    df['VWAP'] = compute_VWAP(df)
    df['ATR'] = compute_ATR(df, window=14)
    df['Williams_%R'] = compute_Williams_R(df['High'], df['Low'], df['Close'])
    df['Stochastic_Oscillator'] = compute_stochastic_oscillator(df['Close'], df['High'], df['Low'])
    df['OBV'] = compute_OBV(df['Close'], df['Volume'])
    df['A/D_Line'] = compute_accumulation_distribution_line(df['High'], df['Low'], df['Close'], df['Volume'])
    df['CMF'] = compute_chaikin_money_flow(df['High'], df['Low'], df['Close'], df['Volume'])
    df['ADX'] = compute_ADX(df['High'], df['Low'], df['Close'])
    df['MFI'] = compute_MFI(df['High'], df['Low'], df['Close'], df['Volume'])
    df['CCI'] = compute_CCI(df['High'], df['Low'], df['Close'])
    df['Donchian_High'], df['Donchian_Low'] = compute_donchian_channels(df['High'], df['Low'])
    df['Parabolic_SAR'] = compute_parabolic_SAR(df['High'], df['Low'], df['Close'])
    df['ROC'] = compute_rate_of_change(df['Close'])
    df['Ultimate_Oscillator'] = compute_ultimate_oscillator(df['Close'], df['High'], df['Low'])
    df['Daily_Return'] = df['Close'].pct_change()

    return df[['SMA', 'EMA', 'RSI', 'Upper_Band', 'Lower_Band', 'MACD', 'Signal', 'VWAP', 'ATR', 
               'Williams_%R', 'Stochastic_Oscillator', 'OBV', 'A/D_Line', 'CMF', 'ADX', 'MFI', 
               'CCI', 'Donchian_High', 'Donchian_Low', 'Parabolic_SAR', 'ROC', 'Ultimate_Oscillator', 'Daily_Return']].fillna(method='ffill')

# 各种指标的具体实现函数需要根据上述定义自行编写

强化学习代码

import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Categorical
import pandas as pd

# 环境定义
class StockTradingEnv:
    def __init__(self, data, initial_balance=10000):
        self.data = data  # 包含20个因子的时间序列数据
        self.current_step = 0
        self.initial_balance = initial_balance
        self.balance = initial_balance
        self.position = 0  # 持仓数量
        self.max_steps = len(data) - 1

    def reset(self):
        self.current_step = 0
        self.balance = self.initial_balance
        self.position = 0
        return self._get_state()

    def _get_state(self):
        # 获取当前状态:账户信息 + 20个因子
        state = [
            self.balance / self.initial_balance,
            self.position / 1000  # 假设最大持仓1000股
        ]
        state.extend(self.data.iloc[self.current_step].values)
        return np.array(state)

    def step(self, action):
        # 0: 卖出, 1: 持有, 2: 买入
        current_price = self.data.iloc[self.current_step]['close']  # 假设close是其中一个因子

        if action == 0 and self.position > 0:  # 卖出
            self.balance += self.position * current_price
            self.position = 0

        elif action == 2 and self.balance > 0:  # 买入
            self.position = self.balance // current_price
            self.balance -= self.position * current_price

        # 计算收益
        portfolio_value = self.balance + self.position * current_price
        reward = portfolio_value - self.initial_balance

        self.current_step += 1
        done = self.current_step >= self.max_steps

        return self._get_state(), reward, done, {}

# PPO模型定义
class ActorCritic(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(ActorCritic, self).__init__()
        
        # 共享网络层
        self.shared = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )
        
        # Actor网络
        self.actor = nn.Sequential(
            nn.Linear(128, output_dim),
            nn.Softmax(dim=-1)
        
        # Critic网络
        self.critic = nn.Linear(128, 1)
        
    def forward(self, x):
        x = self.shared(x)
        action_probs = self.actor(x)
        state_value = self.critic(x)
        return action_probs, state_value

# PPO算法实现
class PPO:
    def __init__(self, input_dim, output_dim, lr=3e-4, gamma=0.99, clip_epsilon=0.2):
        self.policy = ActorCritic(input_dim, output_dim)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon

    def update(self, states, actions, old_probs, rewards, dones):
        # 转换数据为Tensor
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        old_probs = torch.FloatTensor(old_probs)
        
        # 计算折扣回报
        discounted_rewards = []
        running_reward = 0
        for r in reversed(rewards):
            running_reward = r + self.gamma * running_reward
            discounted_rewards.insert(0, running_reward)
        
        # 标准化回报
        discounted_rewards = torch.FloatTensor(discounted_rewards)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)
        
        # 获取新策略的概率和状态价值
        new_probs, state_values = self.policy(states)
        new_probs = new_probs.gather(1, actions.unsqueeze(1)).squeeze()
        
        # 计算优势函数
        advantages = discounted_rewards - state_values.squeeze().detach()
        
        # 计算策略损失(带clip)
        ratio = new_probs / old_probs
        clipped_ratio = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon)
        policy_loss = -torch.min(ratio * advantages, clipped_ratio * advantages).mean()
        
        # 价值函数损失
        value_loss = nn.MSELoss()(state_values.squeeze(), discounted_rewards)
        
        # 总损失
        total_loss = policy_loss + 0.5 * value_loss
        
        # 优化步骤
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

# 训练流程
def main():
    # 参数设置
    num_episodes = 1000
    input_dim = 22  # 20个因子 + 2个账户状态
    output_dim = 3  # 三个动作
    
    # 加载数据(示例数据,需替换为真实数据)
    data = pd.read_csv('stock_data.csv')  # 应包含20个因子列
    
    # 初始化环境和算法
    env = StockTradingEnv(data)
    ppo = PPO(input_dim, output_dim)
    
    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        
        states = []
        actions = []
        old_probs = []
        rewards = []
        
        while True:
            # 获取动作概率
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state)
                action_probs, _ = ppo.policy(state_tensor)
                dist = Categorical(action_probs)
                action = dist.sample().item()
                old_prob = action_probs[action].item()
            
            # 执行动作
            next_state, reward, done, _ = env.step(action)
            
            # 存储经验
            states.append(state)
            actions.append(action)
            old_probs.append(old_prob)
            rewards.append(reward)
            
            state = next_state
            episode_reward += reward
            
            if done:
                # 更新策略
                ppo.update(states, actions, old_probs, rewards, [done]*len(states))
                print(f"Episode {episode+1}, Total Reward: {episode_reward:.2f}")
                break

if __name__ == "__main__":
    main()

关注我,每天分享打爆主力和游资的高效量化方法

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/949268
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 16分钟前
下一篇 15分钟前

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注