跳转至

强化学习

📖 阅读信息

阅读时间:17 分钟 | 中文字符:6625 | 有效代码行数:1548

学习的内容

  • RL 核心概念:智能体(Agent)、环境(Environment)、动作(Action)、状态(State)、奖励函数(Reward Function)、马尔可夫决策过程(MDP)。
  • 经典算法:先掌握 DQN、PPO(最适合车辆控制的入门算法),理解策略梯度、价值函数的基本原理。
  • Python RL 框架:重点学 Stable Baselines3PyTorch Lightning,前者封装度高适合快速上手,后者灵活性强适合深度定制。

基本概念

找到最好的 Function 使得动作得到的收益(reward)最大


例子

  1. 一个小游戏

    • 输入就是 observation(观察),在这里就是游戏此刻的画面
    • 动作 action(输出)为左右移动或者开火
    • 最后使得得到的收益最大
    • 围棋游戏:
    • 环境给一个棋盘图像(或者说棋局的情况),作为输入,动作就是所有未下的地方
    • 动作(输出)会使得环境变化,产生新的输入
    • 这里的最终的奖励不会随着每一步的动作改变,而是最终游戏结束时才会产生:

强化学习和机器学习

机器学习的一般步骤:

  1. 定义一个带有未知数的 function
  2. 定义一个损失 function
  3. 找出使得 loss 最小化的未知数

我们的强化学习也是一样的

  1. 首先是 function with unknown
    1. 输入:例像素画面
    2. actor(policy network)(网络,在加入了深度强化学习之后)(这种网络的结构需要设计)
    3. 输出行动(采用几率的方式,引入随机性(sample),是非常重要的)
  2. 定义 loss
    1. 从开始到结束的过程,每一步可能都会得到 reward,最后得到的总的奖励为(return——reward 的总和):\(R=\sum_{t=1}^Tr_t\)
    2. 在 RL 的情景下,将 -R 看做我们的 loss,使得这个 loss 最小,即 reward 最大
  3. optimization:

    1. 这个互动的过程的集合 wield 为:\(\tau=\{s_1,a_1,s_2,a_2,\cdots\}\)
    2. 找出一个 network (actor)的参数,使得 \(R(\tau)=\sum_{t=1}^Tr_t\)
      有几个点,首先对于相同的输入,产生的输出是随机的
      网络 network 是黑盒,环境也是黑盒
      环境和 reward 有随机性

常用的算法 policy gradient

我们怎么操控 actor 的输出
类似于一个分类的问题

\[ L=e_1-e_2\quad\theta^*=arg\min_\theta L \]

学习得到网络的参数,使得采取的动作与标签\(\widehat{a}\))(训练的资料)产生的差距最小


推广到很多的训练集上

也就是加权一下:\(L=\sum{A}_ne_n\)

如何控制 actor

所以参数的选择就是要:\(\theta^*=arg\min_\theta L\),使得 loss 最小,使得得到的奖励最大

版本 0


收集一些训练资料

  1. 一个 actor 与环境进行互动(可以是一个随机的 actor,也可以是一些已经有的数据),将其得到的结果记录下来:
  2. 那么某个动作对应的加权是什么呢:
    1. 可以先将每一步得到的收益作为系数:
      但是,这一一个局部最优的做法,因为每一个行为都会影响到接下来发生的事情(可能会需要牺牲短期的利益,达到长期的目标)

版本 1

所以我们需要真正学习如何去确定某个动作的参数:

将执行了 \(a_1\) 之后所有的增益加起来就是其参数
类推

\[ \begin{aligned}&G_{1}=r_1+r_2+r_3+......+r_N\\&G_{2}=r_2+r_3+......+r_N\\&G_{3}=r_3+......+r_N\end{aligned} \]

版本 3


进一步优化:(由于随着时间的推移,一步操作对后续操作的影响越来越小)
优化后的公式为:

\[ G_{1}^{\prime}=r_{1}+\gamma r_{2}+\gamma^{2}r_{3}+......\text{Discount factor }\gamma<1 \]

版本 4

对奖赏要进行标准化:

步骤

  1. 初始化 actor network 的 \(\theta^{0}\)
  2. 进行训练 \(i=1->T\)
    1. 使用 \(\theta^{i-1}\) 去进行训练
    2. 训练得到数据为: \(\{s_1,a_1\},\{s_2,a_2\},...,\{s_N,a_N\}\)
    3. 计算参数 \(A_1,A_2,...,A_N\)
    4. 计算损失 \(L\)
    5. 进行参数的更新 \(\theta^i\leftarrow\theta^{i-1}-\eta\nabla L\) (只能更新一次)
    6. 重复上面的步骤
  3. 此时每次更新一次参数就需要收集一次资料
  4. 我们可以使用 PPO 来实现不用每次都重新训练:
    训练的 actor 必须知道它的得到数据的 actor 是不一样的,具体怎么实现不知道
  5. actor 动作必须要有一定的随机性,尽量全面一点

actor-critic

  • critic:评估一个 actor 的好坏的性能(当一个 actor 输入一个 s 时(或者加上做一个动作 a)的条件下),获得的收益
  • 我们使用的 value function:\(V^{\theta}(s)\)
    当使用一个 actor \(\theta\) 时,当输入一个 s 时,接下来得到的 reward 是多少:\(G_1^{\prime}=r_1+\gamma r_2+\gamma^2r_3+......\)
    而这里的 \(V^{\theta}(s)\),就是一个预测的作用(未卜先知),就是看到某个游戏画面之后,预测得到的 reward(因为上面的公式是游戏结束后才能完全知道的)
    \(V^{\theta}(s)\) 的数值与 actor 有关

为了得到上面的那些值,有哪些方法:

  • MC
    actor 与环境互动,互动到游戏结束

    也就是直接使用训练的资料来得到对应的 \(V^{\theta}(s)\)
  • TD:
    不需要完全结束,只需要下面的资料:\(\cdots s_t,a_t,r_t,s_{t+1}\cdots\)

    \[ \begin{aligned}&V^{\theta}(s_{t})=r_{t}+\gamma r_{t+1}+\gamma^{2}r_{t+2}\ldots\\&V^{\theta}(s_{t+1})=r_{t+1}+\gamma r_{t+2}+\cdots\\&V^{\theta}(s_{t})=\gamma V^{\theta}(s_{t+1})+r_{t}\end{aligned} \]

上面就是一个递推的公式,我们需要训练得到的两个数值的差为:

差和 \(r_t\) 接近即可

上述的方法实际上就类似 Q-learning 的价值更新规则

\[ Q(s,a)\leftarrow Q(s,a)+\alpha\left[r+\gamma\max_{a^{\prime}\in\mathcal{A}}Q(s^{\prime},a^{\prime})-Q(s,a)\right] \]

上述两种方法的比较:(还有如何使用上述的方法)

其中的 \(V^\theta(s_b)=3/4\),两种方法都是 0.75(八次的和除以八)
但是 \(V^\theta(s_a)=?\),两种方法得到的数值是不一样的
MC 的话,值为 0 (只看第一次游戏,认为 a, b 两者是相关的,a 之后会影响 b)
TD 的话,值为 \(V^\theta(s_a)=V^\theta(s_b)+r\quad3/4\quad3/4\quad0\)(认为 a, b 之间是不相关的)

进一步的 A 值

\(A\) 值就是训练的依据

要减去对应的 \(V\)
为什么值是这样的:\(\{s_t,a_t\}\quad A_t=G_t^{\prime}-V^\theta(s_t)\) ,这里计算的就是 TD 误差,或者说单步的误差函数

\(V^\theta(s_t)\) 的含义为:(状态价值函数)(实际上就是对当前状态下最终奖励的预测)

是一个期望值(在看到某个游戏画面时),但是此时不一定会执行 \(a_t\)(因为是随机的),所以最后得到的奖励会有很多种,将所有可能的结果平均一下就是:\(V^\theta(s_t)\)

\(G_t^{'}\) 是一定会执行 \(a_t\),得到的 reward 就是 \(G_t^{'}\)动作价值函数)(这个东西的计算方法就是执行动作之后的实际奖赏加上执行之后的新状态的状态价值函数

  • \(\mathrm{A}_{t}>0\):说明执行 \(a_t\) 比平均好
  • 反之则没有平均好

但是上述的后一种只是一种可能性,会存在很大的误差:
所以我们最终的版本为:

最后得到的 \(A_t\) 的表达式为:

\[ r_t+V^\theta(s_{t+1})-V^\theta(s_t) \]
  • \(\mathrm{A}_{t}>0:说明执行 a_t 比平均好\)
  • 反之则没有平均好

这就是我们的actor-critic


设计的一个小技巧:

还有其他的方法,比如说 DQN

reward Shaping

我们之前学到的是:

也就是 actor 与环境互动,得到收益,最后得到一个分数 \(A_t\),作为训练的依据

但是,当大多数情况下的 reward 都是 0 时,怎么解决呢(比如下围棋的时候),甚至像训练机械手臂打螺丝的时候,那就更加没有 reward
所以我们需要加入一些 reward(reward shaping)(比如望梅止渴的故事)


实际使用的例子

怎么定义这个 reward
需要人为添加的


一个很有趣的 reward shaping 的做法——添加好奇心(机器在动作的时候得到有意义的新的东西就加分)——有意义的新
但是有时候会面对一些杂讯(那就是无意义的新了)

No reward(完全没有奖励)

我们找一些人类,也与环境进行互动

我们凭借这些示范(比如自动驾驶),与环境互动进行训练


这是不是一个 supervised learning
比如自动驾驶中复制人类的行为,但是存在一个问题(人类和机器观察到的输入可能是不一样的)(人类的示范不会很全面)(同样也不是所有的人类行为都需要模仿


所以引入接下来的 inverse reinforcement learning(IRL)
我们需要机器从 expert 中学习得到 reward

  • 原则:老师的行为是最棒的(不代表要完全模仿老师的行为)
  • 基本的方法:
    • 初始化 actor
      • actor 与环境互动
      • 学习定义 reward function,这个 function 要使得老师的得分高于 actor:\(\sum_{n=1}^KR(\hat{\tau}_n)>\sum_{n=1}^KR(\tau)\)
      • 更新 actor,使得 reward 最大化(使用新的 reward function)
    • 两种的比较(GAN 和 IRL)
      • 两者有异曲同工之妙

人为的演示
给一个图像进行模仿

算法深入学习:

算法的分类:

算法类别 核心思想 依赖模型 动作空间 代表算法 本质区别
动态规划 基于完美模型的迭代求解 离散/连续 价值迭代、策略迭代 理论最优,依赖完美模型
时序差分 无模型采样 + 自举 离散 Q-Learning、SARSA 用 TD 误差更新价值,样本效率高
Dyna-Q 无模型 + 基于模型混合 否(学模型) 离散 Dyna-Q 用模拟经验补充真实经验
价值 -based 深度 RL 用 NN 拟合 Q 函数 离散 DQN、Double DQN 从“值多少”推导策略
策略 -based 深度 RL 直接优化策略函数 离散/连续 策略梯度 直接学习“怎么做”
Actor-Critic 策略 + 价值混合 离散/连续 AC、TRPO、PPO、DDPG、SAC 用 Critic 稳定 Actor 训练

依赖模型的意思是是否有状态转移概率以及对应的奖励函数:

  • 状态转移概率 \(P(s′∣s,a)\):在状态 s 下执行动作 a,转移到下一个状态 s′ 的概率。
  • 奖励函数 \(R(s,a,s′)\):在状态 s 执行动作 a 并转移到 s′ 后,获得的即时奖励。

符号 名称 核心含义 作用
\(Q(s,a)\) 动作价值函数 在状态 s 做动作 a 的期望总奖励 指导智能体选择动作(选 Q 值最大的动作)
\(V(s)\) 状态价值函数 在状态 s 的期望总奖励 评估策略的好坏,或衡量状态的优劣
\(G_t​\) 回报 从时间步 t 开始的实际累积奖励 作为 V 和 Q 的 “真实标签”,用于更新价值估计

DQN 算法

简介

当状态或者动作数量非常大的时候,这种做法(将每个状态下所有动作的价值的表格列出来——Q-Learning)就不适用了。例如,当状态是一张 RGB 图像时,假设图像大小是,此时一共有 \(256^{(210\times60\times3)}\) 种状态,此时把所有的动作价值 \(Q(s,a)\) 都列出来是不现实


所以采用函数拟合的方式估计 Q 值
我们将用于拟合函数 \(Q\)神经网络称为 Q 网络:

输入为状态与动作(\(s, a\)
输出为对应获得的价值(也可以只输入一个状态 \(s\),输出各个离散动作下的价值)


Q 网络的损失函数构造为均方误差的形式:

\[ \omega^*=\arg\min_\omega\frac{1}{2N}\sum_{i=1}^N\left[Q_\omega\left(s_i,a_i\right)-\left(r_i+\gamma\max_{a^{\prime}}Q_\omega\left(s_i^{\prime},a^{\prime}\right)\right)\right]^2 \]

DQN 中还有两个重要的模块:
经验回放目标网络,它们能够帮助 DQN 取得稳定、出色的性能。

经验回放

DQN 算法采用了经验回放(experience replay)方法,具体做法为维护一个回放缓冲区,将每次从环境中采样得到的四元组数据(状态、动作、奖励、下一状态)存储到回放缓冲区中,训练 Q 网络的时候再从回放缓冲区中随机采样若干数据来进行训练。这么做可以起到以下两个作用。

  1. 使样本满足独立假设。在 MDP 中交互采样得到的数据本身不满足独立假设,因为这一时刻的状态和上一时刻的状态有关。非独立同分布的数据对训练神经网络有很大的影响,会使神经网络拟合到最近训练的数据上。采用经验回放可以打破样本之间的相关性,让其满足独立假设
  2. 提高样本效率。每一个样本可以被使用多次,十分适合深度神经网络的梯度学习。

目标网络

DQN 算法最终更新的目标是让 \(Q_{\omega}\left(s,a\right)\) 逼近 \(r+\gamma\operatorname*{max}_{a^{\prime}}Q_{\omega}\left(s^{\prime},a^{\prime}\right)\),由于 TD 误差目标本身就包含神经网络的输出,因此在更新网络参数的同时目标也在不断地改变,这非常容易造成神经网络训练的不稳定性。
由于 TD 误差目标本身就包含神经网络的输出,因此在更新网络参数的同时目标也在不断地改变,这非常容易造成神经网络训练的不稳定性。为了实现这一思想,我们需要利用两套 Q 网络

流程
  • 用随机的网络参数 \(\omega\) 初始化网络 \(Q_\omega(s, a)\)
  • 复制相同的参数 \(\omega^- \leftarrow \omega\) 来初始化目标网络 \(Q_{\omega^-}\)
  • 初始化经验回放池 \(R\)
  • for 序列 \(e = 1 \to E\) do
    • 获取环境初始状态 \(s_1\)
    • for 时间步 \(t = 1 \to T\) do
      • 根据当前网络 \(Q_\omega(s, a)\)\(\varepsilon\)
      • 贪婪策略选择动作 \(a_t\)
      • 执行动作 \(a_t\),获得回报 \(r_t\),环境状态变为 \(s_{t+1}\)
      • \((s_t, a_t, r_t, s_{t+1})\) 存储进回放池 \(R\)
      • \(R\) 中数据足够,从 \(R\) 中采样 \(N\) 个数据 \(\{(s_i, a_i, r_i, s_{i+1})\}_{i=1,\dots,N}\)
      • 对每个数据,用目标网络计算 \(y_i = r_i + \gamma \max_a Q_{\omega^-}(s_{i+1}, a)\)
      • 最小化目标损失 \(L = \frac{1}{N} \sum_i (y_i - Q_\omega(s_i, a_i))^2\),以此更新当前网络 \(Q_\omega\)
      • 更新目标网络
    • end for
  • end for

代码实践环节

我们采用的测试环境是 CartPole-v0,其状态空间相对简单,只有 4 个变量,因此网络结构的设计也相对简单(也就是 Q 网络):采用一层 128 个神经元的全连接并以 ReLU 作为激活函数。当遇到更复杂的诸如以图像作为输入的环境时,我们可以考虑采用深度卷积神经网络


Python
import random  
import gymnasium as gym  # 彻底替换为gymnasium,不再用旧gym  
import numpy as np  
import collections  
from tqdm import tqdm  
import torch  
import torch.nn.functional as F  
import matplotlib.pyplot as plt  

# ===== 经验回放池=====  
class ReplayBuffer:  
    ''' 经验回放池 '''    def __init__(self, capacity):  
        self.buffer = collections.deque(maxlen=capacity)  # 队列,先进先出  

    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer  
        self.buffer.append((state, action, reward, next_state, done))  

    def sample(self, batch_size):  # 从buffer中采样数据,数量为batch_size  
        transitions = random.sample(self.buffer, batch_size)  
        state, action, reward, next_state, done = zip(*transitions)  
        return np.array(state), action, reward, np.array(next_state), done  

    def size(self):  # 目前buffer中数据的数量  
        return len(self.buffer)  

# ===== Q网络=====  
class Qnet(torch.nn.Module):  
    ''' 只有一层隐藏层的Q网络 '''    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(Qnet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  # 隐藏层使用ReLU激活函数  
        return self.fc2(x)  

# ===== DQN算法=====  
class DQN:  
    ''' DQN算法 '''    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,  
                 epsilon, target_update, device):  
        self.action_dim = action_dim  
        self.q_net = Qnet(state_dim, hidden_dim,  
                          self.action_dim).to(device)  # Q网络  
        # 目标网络  
        self.target_q_net = Qnet(state_dim, hidden_dim,  
                                 self.action_dim).to(device)  
        # 使用Adam优化器  
        self.optimizer = torch.optim.Adam(self.q_net.parameters(),  
                                          lr=learning_rate)  
        self.gamma = gamma  # 折扣因子  
        self.epsilon = epsilon  # epsilon-贪婪策略  
        self.target_update = target_update  # 目标网络更新频率  
        self.count = 0  # 计数器,记录更新次数  
        self.device = device  

    def take_action(self, state):  # epsilon-贪婪策略采取动作  
        if np.random.random() < self.epsilon:  
            action = np.random.randint(self.action_dim)  
        else:  
            state = torch.tensor([state], dtype=torch.float).to(self.device)  
            action = self.q_net(state).argmax().item()  
        return action  

    def update(self, transition_dict):  
        states = torch.tensor(transition_dict['states'],  
                              dtype=torch.float).to(self.device)  
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(  
            self.device)  
        rewards = torch.tensor(transition_dict['rewards'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        next_states = torch.tensor(transition_dict['next_states'],  
                                   dtype=torch.float).to(self.device)  
        dones = torch.tensor(transition_dict['dones'],  
                             dtype=torch.float).view(-1, 1).to(self.device)  

        q_values = self.q_net(states).gather(1, actions)  # Q值  
        # 下个状态的最大Q值  
        max_next_q_values = self.target_q_net(next_states).max(1)[0].view(  
            -1, 1)  
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones  
                                                                )  # TD误差目标  
        dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))  # 均方误差损失函数  
        self.optimizer.zero_grad()  # PyTorch中默认梯度会累积,这里需要显式将梯度置为0  
        dqn_loss.backward()  # 反向传播更新参数  
        self.optimizer.step()  

        if self.count % self.target_update == 0:  
            self.target_q_net.load_state_dict(  
                self.q_net.state_dict())  # 更新目标网络  
        self.count += 1  

# ===== 主程序(小车悬臂的问题)=====  
if __name__ == "__main__":  
    # 1. 配置参数  
    lr = 2e-3               # 学习率  
    num_episodes = 500      # 训练总回合数  
    hidden_dim = 128        # Q网络隐藏层维度  
    gamma = 0.98            # 折扣因子  
    epsilon = 0.01          # epsilon-贪婪策略  
    target_update = 10      # 目标网络更新频率  
    buffer_size = 10000     # 经验回放池容量  
    batch_size = 64         # 批次大小  
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

    # 2. 创建环境(修正1:用CartPole-v1,移除seed(),改用reset设置种子)  
    env = gym.make("CartPole-v1", render_mode="human")  # render_mode="human" 方便测试时可视化  
    state_dim = env.observation_space.shape[0]  
    action_dim = env.action_space.n  

    # 3. 初始化组件  
    replay_buffer = ReplayBuffer(buffer_size)  
    agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update, device)  

    # 4. 训练循环  
    return_list = []  
    for i in range(10):  
        with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:  
            for i_episode in range(int(num_episodes/10)):  
                episode_return = 0  
                # 修正2:用reset(seed=0)设置随机种子,替代旧的env.seed(0)  
                state, _ = env.reset(seed=0)  # Gymnasium的reset返回(state, info),并支持seed参数  
                done = False  
                while not done:  
                    action = agent.take_action(state)  
                    # 修正3:Gymnasium的step返回(next_state, reward, terminated, truncated, info)  
                    next_state, reward, terminated, truncated, _ = env.step(action)  
                    # 合并终止条件(terminated是任务完成,truncated是步数超限)  
                    done = terminated or truncated  
                    replay_buffer.add(state, action, reward, next_state, done)  
                    state = next_state  
                    episode_return += reward  

                    if replay_buffer.size() > batch_size:  
                        b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)  
                        transition_dict = {  
                            'states': b_s,  
                            'actions': b_a,  
                            'next_states': b_ns,  
                            'rewards': b_r,  
                            'dones': b_d  
                        }  
                        agent.update(transition_dict)  
                return_list.append(episode_return)  

                if (i_episode+1) % 10 == 0:  
                    pbar.set_postfix({  
                        'episode':  
                        '%d' % (num_episodes/10 * i + i_episode + 1),  
                        'return':  
                        '%.3f' % np.mean(return_list[-10:])  
                    })  
                pbar.update(1)  

    # 5. 可视化奖励曲线  
    episodes_list = list(range(len(return_list)))  
    plt.plot(episodes_list, return_list)  
    plt.xlabel('Episodes')  
    plt.ylabel('Returns')  
    plt.title('DQN on CartPole-v1')  
    plt.show()  

    # 6. 测试智能体(可选)  
    test_episodes = 5  
    for _ in range(test_episodes):  
        state, _ = env.reset(seed=0)  
        done = False  
        while not done:  
            action = agent.take_action(state)  
            next_state, reward, terminated, truncated, _ = env.step(action)  
            done = terminated or truncated  
            state = next_state  
    env.close()

上述就是代码的实现


运行:

分为 10 次运行,每个分段 50 回合
return 就是得到的回报(r)
CartPole 中 “return”= 每回合小车保持平衡的步数(奖励 = 每步 1 分),493.4 意味着最近 10 回合平均能撑 493 步(接近 CartPole-v1 的 500 步上限,说明智能体已接近最优

训练的图像:

成果
  • 整体趋势
    • 前期(0-100 回合):回报从接近 0 快速飙升到 200+,说明智能体在极短时间内就从 “随机乱推” 学会了 “让小车平衡更久” 的基本策略,学习效率很高。
    • 中期(100-300 回合):回报出现明显波动,有时能达到 500(满分),有时又掉到 100 以下。这是 DQN 训练中的正常现象,源于 ε- 贪婪策略的探索性和网络更新的不稳定性。
    • 后期(300-500 回合):虽然仍有波动,但绝大多数回合的回报都稳定在 400-500 之间,多次触顶 500,说明智能体已经掌握了接近最优的策略。
  • 关键指标解读
    • CartPole-v1 的满分是 500,代表小车成功平衡了 500 步(达到环境最大步数限制)。你的曲线多次达到 500,说明智能体已经能稳定完成任务。
    • 曲线后期的波动,主要是因为:
      1. ε- 贪婪策略:仍有 1% 的概率随机选择动作,导致偶尔 “失误”。
      2. 环境随机性:CartPole 的初始状态有微小扰动,导致每回合表现略有差异。

以图像为输入的 DQN 算法

之前的使用的是非图像的状态作为输入(例如车杆环境中车的坐标、速度),但是,我们需要将图像作为输入时,需要将卷积层加入其网络结构以提取图像特征,最终实现以图像为输入的强化学习。以图像为输入的 DQN 算法的代码与 7.4 节的代码的不同之处主要在于 Q 网络的结构和数据输入


Python
class ConvolutionalQnet(torch.nn.Module):
    ''' 加入卷积层的Q网络 '''
    def __init__(self, action_dim, in_channels=4):
        super(ConvolutionalQnet, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        self.conv2 = torch.nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = torch.nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc4 = torch.nn.Linear(7 * 7 * 64, 512)
        self.head = torch.nn.Linear(512, action_dim)

    def forward(self, x):
        x = x / 255
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc4(x))
        return self.head(x)

其主要思想是用一个神经网络来表示最优策略的函数,然后利用 Q-learning 的思想进行参数更新。为了保证训练的稳定性和高效性,DQN 算法引入了经验回放和目标网络两大模块,使得算法在实际应用时能够取得更好的效果。
DQN 是深度强化学习的基础,掌握了该算法才算是真正进入了深度强化学习领域

DQN 的改进算法

Double DQN

策略梯度算法

本书之前介绍的 Q-learning、DQN 及 DQN 改进算法都是基于价值(value-based)(学习上述的 Q 值) 的方法;在强化学习中,除了基于值函数的方法,还有一支非常经典的方法,那就是基于策略(policy-based)的方法。 (本笔记最初学习的那几个版本就是基于策略的)

对比两者,
基于值函数的方法主要是学习值函数,然后根据值函数导出一个策略,学习过程中并不存在一个显式的策略;
而基于策略的方法则是直接显式地学习一个目标策略

对比
  • 算法的核心更新步骤是在优化什么?
    • 如果是在最小化 TD 误差,让 Q(s,a) 更准确,那它就是基于价值的。(也就是得到精确的价值函数,由此函数选择策略)
    • 如果是在最大化期望回报,直接调整策略参数 θ,那它就是基于策略的。(直接优化策略)
    • 如果两者都有,并且策略更新依赖于价值评估,那它就是Actor-Critic(混合)
  • 记住几个典型例子
    • 纯价值:DQN(只有 Q 网络)。
    • 纯策略:REINFORCE(只有策略网络,用累积奖励更新)。
    • 混合:PPO(策略网络 + 价值网络,核心是策略优化);SAC(策略网络 + 双 Q 网络,典型 Actor-Critic)。

策略梯度

假设目标策略 \(\pi_{\theta}\) 是随机性的策略,\(\theta\) 为对应的参数。
我们可以用一个线性模型或者神经网络模型来为这样一个策略函数建模输入某个状态,然后输出一个动作的概率分布。我们的目标是要寻找一个最优策略最大化这个策略在环境中的期望回报。我们将策略学习的目标函数定义为

\[ J(\theta)=\mathbb{E}_{s_0}[V^{\pi_\theta}(s_0)] \]

现在有了目标函数,我们将目标函数对 \(\theta\) 策略求导,得到导数后,就可以用梯度上升方法来最大化这个目标函数,从而得到最优策略。


\[ \begin{aligned}\nabla_\theta J(\theta)&\propto\sum_{s\in S}\nu^{\pi_\theta}(s)\sum_{a\in A}Q^{\pi_\theta}(s,a)\nabla_\theta\pi_\theta(a|s)\\&=\sum_{s\in S}\nu^{\pi_\theta}(s)\sum_{a\in A}\pi_\theta(a|s)Q^{\pi_\theta}(s,a)\frac{\nabla_\theta\pi_\theta(a|s)}{\pi_\theta(a|s)}\\&=\mathbb{E}_{\pi_\theta}[Q^{\pi_\theta}(s,a)\nabla_\theta\log\pi_\theta(a|s)]\end{aligned} \]

这个梯度可以用来更新策略。需要注意的是,因为上式中期望 \(E\) 的下标 \(\pi_{\theta}\) 是,所以策略梯度算法为在线策略算法,即必须使用当前策略采样得到的数据来计算梯度(就是上面的版本 4 中出现的内容)。直观理解一下策略梯度这个公式,可以发现在每一个状态下,梯度的修改是让策略更多地去采样到带来较高值的动作,更少地去采样到带来较低值的动作,如图所示:
对于一个有限步数的环境来说,REINFORCE 算法中的策略梯度为:

\[ \nabla_\theta J(\theta)=\mathbb{E}_{\pi_\theta}\left[\sum_{t=0}^T\left(\sum_{t^{\prime}=t}^T\gamma^{t^{\prime}-t}r_{t^{\prime}}\right)\nabla_\theta\log\pi_\theta(a_t|s_t)\right] \]

REINFORCE 代码实践

Python
import gymnasium as gym  
import torch  
import torch.nn.functional as F  
import numpy as np  
import matplotlib.pyplot as plt  
from tqdm import tqdm  
import random  # 新增:用于设置全局随机种子  


# ===== 策略网络(不变)=====  
class PolicyNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(PolicyNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return F.softmax(self.fc2(x), dim=1)  


# ===== REINFORCE算法(不变)=====  
class REINFORCE:  
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,  
                 device):  
        self.policy_net = PolicyNet(state_dim, hidden_dim,  
                                    action_dim).to(device)  
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(),  
                                          lr=learning_rate)  # 使用Adam优化器  
        self.gamma = gamma  # 折扣因子  
        self.device = device  

    def take_action(self, state):  # 根据动作概率分布随机采样  
        state = torch.tensor([state], dtype=torch.float).to(self.device)  
        probs = self.policy_net(state)  
        action_dist = torch.distributions.Categorical(probs)  
        action = action_dist.sample()  
        return action.item()  

    def update(self, transition_dict):  
        reward_list = transition_dict['rewards']  
        state_list = transition_dict['states']  
        action_list = transition_dict['actions']  

        G = 0  
        self.optimizer.zero_grad()  
        for i in reversed(range(len(reward_list))):  # 从最后一步算起  
            reward = reward_list[i]  
            state = torch.tensor([state_list[i]],  
                                 dtype=torch.float).to(self.device)  
            action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)  
            log_prob = torch.log(self.policy_net(state).gather(1, action))  
            G = self.gamma * G + reward  
            loss = -log_prob * G  # 每一步的损失函数  
            loss.backward()  # 反向传播计算梯度  
        self.optimizer.step()  # 梯度下降  


# ===== 主程序(核心修正部分)=====  
if __name__ == "__main__":  
    # 1. 配置参数(不变)  
    learning_rate = 1e-3  
    num_episodes = 1000  
    hidden_dim = 128  
    gamma = 0.98  
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  


    # 2. 设置全局随机种子(新增:保证实验可复现,替代原env.seed(0))  
    def set_seed(seed=0):  
        random.seed(seed)  
        np.random.seed(seed)  
        torch.manual_seed(seed)  
        if torch.cuda.is_available():  
            torch.cuda.manual_seed(seed)  
            torch.cuda.manual_seed_all(seed)  


    set_seed(0)  

    # 3. 创建环境(修正1:升级为CartPole-v1,避免过时警告)  
    env_name = "CartPole-v1"  
    env = gym.make(env_name)  

    # 4. 移除env.seed(0)(核心修正2:该方法已废弃)  
    # env.seed(0)  # 删掉这行!  

    # 5. 获取环境维度(不变)  
    state_dim = env.observation_space.shape[0]  
    action_dim = env.action_space.n  
    agent = REINFORCE(state_dim, hidden_dim, action_dim, learning_rate, gamma, device)  

    # 6. 训练循环(核心修正3:适配reset/step返回值)  
    return_list = []  
    for i in range(10):  
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:  
            for i_episode in range(int(num_episodes / 10)):  
                episode_return = 0  
                transition_dict = {  
                    'states': [],  
                    'actions': [],  
                    'next_states': [],  
                    'rewards': [],  
                    'dones': []  
                }  
                # 修正4:Gymnasium的reset返回(state, info),并设置seed保证可复现  
                state, info = env.reset(seed=0)  
                done = False  
                while not done:  
                    action = agent.take_action(state)  
                    # 修正5:Gymnasium的step返回5个值(next_state, reward, terminated, truncated, info)  
                    next_state, reward, terminated, truncated, info = env.step(action)  
                    # 修正6:合并终止条件(terminated=任务完成,truncated=步数超限)  
                    done = terminated or truncated  

                    # 以下逻辑不变  
                    transition_dict['states'].append(state)  
                    transition_dict['actions'].append(action)  
                    transition_dict['next_states'].append(next_state)  
                    transition_dict['rewards'].append(reward)  
                    transition_dict['dones'].append(done)  
                    state = next_state  
                    episode_return += reward  
                return_list.append(episode_return)  
                agent.update(transition_dict)  
                if (i_episode + 1) % 10 == 0:  
                    pbar.set_postfix({  
                        'episode':  
                            '%d' % (num_episodes / 10 * i + i_episode + 1),  
                        'return':  
                            '%.3f' % np.mean(return_list[-10:])  
                    })  
                pbar.update(1)  

    # 新增:可视化训练结果(可选,方便看效果)  
    episodes_list = list(range(len(return_list)))  
    plt.plot(episodes_list, return_list)  
    plt.xlabel('Episodes')  
    plt.ylabel('Returns')  
    plt.title('REINFORCE on CartPole-v1')  
    plt.show()  

    # 关闭环境  
    env.close()

结果:

结果分析
  • 前期(0–200 回合)
    • 回报从接近 0 快速攀升到 100–200,说明智能体在极短时间内就从 “随机乱推” 学会了 “让小车平衡更久” 的基本策略。
    • 这是策略梯度算法的优势:能快速从环境交互中学习到有效策略,不需要像 DQN 那样先学习价值函数。
  • 中期(200–600 回合)
    • 回报出现剧烈波动:有时能达到 500(满分),有时又掉到几十。
    • 这是 REINFORCE 的核心缺陷:梯度估计方差极大。它直接用整条轨迹的累积奖励 G 作为更新信号,而 G 本身受环境随机性和策略探索影响,波动非常大,导致策略更新方向不稳定,所以回报忽高忽低。
    • 但能多次触顶 500,说明策略已经能找到最优解,只是无法稳定保持。
  • 后期(600–1000 回合)
    • 600–800 回合:回报稳定在 500,说明策略在一段时间内收敛到了最优,能稳定完成任务。
    • 800–1000 回合:回报突然掉回 150–200 并持续波动,这是纯策略梯度的典型问题 ——缺乏更新约束。没有像 PPO 那样的 KL 散度或裁剪目标来限制策略更新幅度,导致策略 “飘走”,从最优解退化成较差策略。

所以需要进一步优化:
REINFORCE 算法理论上是能保证局部最优的,它实际上是借助蒙特卡洛方法采样轨迹来估计动作价值,这种做法的一大优点是可以得到无偏的梯度。但是,正是因为使用了蒙特卡洛方法,REINFORCE 算法的梯度估计的方差很大,可能会造成一定程度上的不稳定,这也是第 10 章将介绍的 Actor-Critic 算法要解决的问题。

Actor-Critic 算法

  • Actor-Critic 是囊括一系列算法的整体架构,目前很多高效的前沿算法都属于 Actor-Critic 算法,本章接下来将会介绍一种最简单的 Actor-Critic 算法。
  • 需要明确的是,Actor-Critic 算法本质上是基于策略的算法,因为这一系列算法的目标都是优化一个带参数的策略,只是会额外学习价值函数,从而帮助策略函数更好地学习。

Actor-Critic

我们将 Actor-Critic 分为两个部分:Actor(策略网络)和 Critic(价值网络),如图

  • Actor 要做的是与环境交互,并在 Critic 价值函数的指导下用策略梯度学习一个更好的策略。(基于策略)
  • Critic 要做的是通过 Actor 与环境交互收集的数据学习一个价值函数,这个价值函数会用于判断在当前状态什么动作是好的,什么动作不是好的,进而帮助 Actor 进行策略更新。(基于价值)

Actor 的更新采用策略梯度的原则,那 Critic 如何更新呢?我们将 Critic 价值网络表示为 \(V_{omega}\),参数为 \(\omega\)。于是,我们可以采取时序差分残差的学习方式,对于单个数据定义如下价值函数的损失函数:

\[ \mathcal{L}(\omega)=\frac{1}{2}(r+\gamma V_\omega(s_{t+1})-V_\omega(s_t))^2 \]

因此,价值函数的梯度为:

\[ \nabla_\omega\mathcal{L}(\omega)=-(r+\gamma V_\omega(s_{t+1})-V_\omega(s_t))\nabla_\omega V_\omega(s_t) \]

之后使用梯度下降的方式更新价值网络参数即可:

步骤
  • 初始化策略网络参数 \(\theta\),价值网络参数 \(\omega\)
  • for 序列 \(e=1\to E\) do :
    • 用当前策略 \(\pi_\theta\) 采样轨迹 \(\{s_1,a_1,r_1,s_2,a_2,r_2,\ldots\}\)
    • 为每一步数据计算:\(\delta_t=r_t+\gamma V_\omega(s_{t+1})-V_\omega(s_t)\)
    • 更新价值参数 \(w=w+\alpha_\omega\sum_t\delta_t\nabla_\omega V_\omega(s_t)\)
    • 更新策略参数 \(\theta=\theta+\alpha_\theta\sum_t\delta_t\nabla_\theta\log\pi_\theta(a_t|s_t)\)
  • end for

代码实践

Python
import gymnasium as gym  
import torch  
import torch.nn.functional as F  
import numpy as np  
import matplotlib.pyplot as plt  
from tqdm import tqdm  
import random  # 新增:设置全局随机种子  

# ===== 1. 补全缺失的ValueNet类(核心!原代码未定义)=====  
class ValueNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim):  
        super(ValueNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, 1)  # 输出状态价值(标量)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return self.fc2(x)  # 输出V(s)  

# ===== 2. 策略网络(不变)=====  
class PolicyNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(PolicyNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return F.softmax(self.fc2(x), dim=1)  # 输出动作概率分布  

# ===== 3. Actor-Critic算法(仅适配设备,逻辑不变)=====  
class ActorCritic:  
    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,  
                 gamma, device):  
        # Actor:策略网络(选动作)  
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)  
        # Critic:价值网络(评估状态价值)  
        self.critic = ValueNet(state_dim, hidden_dim).to(device)  
        # 优化器(分开优化Actor和Critic)  
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)  
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)  
        self.gamma = gamma  # 折扣因子  
        self.device = device  

    def take_action(self, state):  # 根据策略网络采样动作  
        state = torch.tensor([state], dtype=torch.float).to(self.device)  
        probs = self.actor(state)  
        action_dist = torch.distributions.Categorical(probs)  
        action = action_dist.sample()  
        return action.item()  

    def update(self, transition_dict):  
        # 解析经验数据  
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)  
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)  
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)  
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)  
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)  

        # Critic更新:时序差分(TD)误差优化价值网络  
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)  # TD目标  
        td_delta = td_target - self.critic(states)  # TD误差(优势函数近似)  
        critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))  # 均方误差损失  

        # Actor更新:用TD误差指导策略梯度(降低方差)  
        log_probs = torch.log(self.actor(states).gather(1, actions))  
        actor_loss = torch.mean(-log_probs * td_delta.detach())  # 策略梯度损失  

        # 反向传播+参数更新  
        self.actor_optimizer.zero_grad()  
        self.critic_optimizer.zero_grad()  
        actor_loss.backward()  
        critic_loss.backward()  
        self.actor_optimizer.step()  
        self.critic_optimizer.step()  

# ===== 4. 替代rl_utils.train_on_policy_agent(自定义训练循环,避免依赖缺失)=====  
def train_on_policy_agent(env, agent, num_episodes):  
    return_list = []  # 记录每回合奖励  
    for i in range(10):  # 分10个迭代段,用tqdm显示进度  
        with tqdm(total=int(num_episodes/10), desc=f'Iteration {i}') as pbar:  
            for i_episode in range(int(num_episodes/10)):  
                episode_return = 0  
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}  
                # 重置环境(适配Gymnasium:返回(state, info),设置seed保证可复现)  
                state, info = env.reset(seed=0)  
                done = False  
                while not done:  
                    action = agent.take_action(state)  
                    # 执行动作(适配Gymnasium:step返回5个值)  
                    next_state, reward, terminated, truncated, info = env.step(action)  
                    done = terminated or truncated  # 合并终止条件  
                    # 存储经验  
                    transition_dict['states'].append(state)  
                    transition_dict['actions'].append(action)  
                    transition_dict['next_states'].append(next_state)  
                    transition_dict['rewards'].append(reward)  
                    transition_dict['dones'].append(done)  
                    # 更新状态和奖励  
                    state = next_state  
                    episode_return += reward  
                # 记录奖励+更新策略(On-Policy:每回合结束后更新)  
                return_list.append(episode_return)  
                agent.update(transition_dict)  
                # 进度条更新  
                if (i_episode+1) % 10 == 0:  
                    pbar.set_postfix({'episode': f'{num_episodes/10*i + i_episode+1}', 'return': f'{np.mean(return_list[-10:]):.3f}'})  
                pbar.update(1)  
    return return_list  

# ===== 5. 主程序(核心修正:适配Gymnasium+补全配置)=====  
if __name__ == "__main__":  
    # 超参数配置  
    actor_lr = 1e-3  
    critic_lr = 1e-2  
    num_episodes = 1000  
    hidden_dim = 128  
    gamma = 0.98  
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

    # 设置全局随机种子(替代env.seed(0),保证实验可复现)  
    def set_seed(seed=0):  
        random.seed(seed)  
        np.random.seed(seed)  
        torch.manual_seed(seed)  
        if torch.cuda.is_available():  
            torch.cuda.manual_seed(seed)  
            torch.cuda.manual_seed_all(seed)  
    set_seed(0)  

    # 创建环境(升级为CartPole-v1,避免过时警告)  
    env_name = 'CartPole-v1'  
    env = gym.make(env_name)  
    state_dim = env.observation_space.shape[0]  
    action_dim = env.action_space.n  

    # 初始化智能体+开始训练  
    agent = ActorCritic(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device)  
    return_list = train_on_policy_agent(env, agent, num_episodes)  

    # 可视化训练结果  
    episodes_list = list(range(len(return_list)))  
    plt.plot(episodes_list, return_list)  
    plt.xlabel('Episodes')  
    plt.ylabel('Returns')  
    plt.title('Actor-Critic on CartPole-v1')  
    plt.show()  

    # 关闭环境  
    env.close()

结果:

分析
  • 前期(0–200 回合)
    • 回报从接近 0 快速攀升到 100–200,和 REINFORCE 类似,说明智能体同样能快速从环境交互中学习到有效策略。
    • 波动幅度比 REINFORCE 略小,这是因为 Critic(价值网络)提供了更稳定的更新信号。
  • 中期(200–600 回合)
    • 回报在 100–500 之间波动,但整体趋势向上,多次触顶 500(满分)。
    • 相比 REINFORCE 同一阶段的剧烈震荡,这里的波动明显更小,这是 Actor-Critic 的关键改进:用 TD 误差(td_delta)替代了原始累积奖励 G 作为策略梯度的更新信号,大幅降低了梯度方差,让策略更新方向更稳定。
  • 后期(600–1000 回合)
    • 回报稳定在 500 左右,仅偶尔出现小幅回落,这是最核心的进步!
    • 对比 REINFORCE 后期从 500 掉回 150–200 的 “发散” 问题,Actor-Critic 能稳定保持最优策略,这是因为 Critic 提供的价值基线,让策略更新始终围绕 “更优动作” 进行,不会像纯策略梯度那样 “飘走”。

总结

Actor-Critic 算法,它是基于值函数的方法和基于策略的方法的叠加价值模块 Critic 在策略模块 Actor 采样的数据中学习分辨什么是好的动作,什么不是好的动作,进而指导 Actor 进行策略更新。
Actor-Critic 算法非常实用,后续章节中的 TRPO、PPO、DDPG、SAC 等深度强化学习算法都是在 Actor-Critic 框架下进行发展的。

TRPO 算法

回顾一下基于策略的方法:参数化智能体的策略,并设计衡量策略好坏的目标函数,通过梯度上升的方法来最大化这个目标函数,使得策略最优。
但是这种算法有一个明显的缺点:当策略网络是深度模型时,沿着策略梯度更新参数,很有可能由于步长太长,策略突然显著变差,进而影响训练效果。


针对以上问题,我们考虑在更新时找到一块信任区域(trust region),在这个区域上更新策略时能够得到某种策略性能的安全性保证,这就是信任区域策略优化(trust region policy optimization,TRPO)算法的主要思想。

代码实践

主要在倒立摆的环境中进行代码的实践
由于它是与连续动作交互的环境,我们需要对上面的代码做一定的修改。对于策略网络,因为环境是连续动作的,所以策略网络分别输出表示动作分布的高斯分布的均值和标准差


Python
import torch  
import numpy as np  
import gymnasium as gym  
import matplotlib.pyplot as plt  
import torch.nn.functional as F  
import copy  
from tqdm import tqdm  

# 仅补全缺失的ValueNet(原代码调用但未定义)  
class ValueNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim):  
        super(ValueNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, 1)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return self.fc2(x)  

# 仅补全缺失的compute_advantage(原代码调用但未定义)  
def compute_advantage(gamma, lmbda, td_delta):  
    td_delta = td_delta.detach().numpy()  
    advantage_list = []  
    advantage = 0.0  
    for delta in reversed(td_delta):  
        advantage = gamma * lmbda * advantage + delta  
        advantage_list.append(advantage)  
    advantage_list.reverse()  
    return torch.tensor(advantage_list, dtype=torch.float)  

# 保留你原代码的PolicyNetContinuous  
class PolicyNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(PolicyNetContinuous, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)  
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        mu = 2.0 * torch.tanh(self.fc_mu(x))  
        std = F.softplus(self.fc_std(x))  
        return mu, std  

# 保留你原代码的TRPOContinuous  
class TRPOContinuous:  
    def __init__(self, hidden_dim, state_space, action_space, lmbda,  
                 kl_constraint, alpha, critic_lr, gamma, device):  
        state_dim = state_space.shape[0]  
        action_dim = action_space.shape[0]  
        self.actor = PolicyNetContinuous(state_dim, hidden_dim,  
                                         action_dim).to(device)  
        self.critic = ValueNet(state_dim, hidden_dim).to(device)  
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),  
                                                 lr=critic_lr)  
        self.gamma = gamma  
        self.lmbda = lmbda  
        self.kl_constraint = kl_constraint  
        self.alpha = alpha  
        self.device = device  

    def take_action(self, state):  
        state = torch.tensor([state], dtype=torch.float).to(self.device)  
        mu, std = self.actor(state)  
        action_dist = torch.distributions.Normal(mu, std)  
        action = action_dist.sample()  
        return [action.item()]  

    def hessian_matrix_vector_product(self,  
                                      states,  
                                      old_action_dists,  
                                      vector,  
                                      damping=0.1):  
        mu, std = self.actor(states)  
        new_action_dists = torch.distributions.Normal(mu, std)  
        kl = torch.mean(  
            torch.distributions.kl.kl_divergence(old_action_dists,  
                                                 new_action_dists))  
        kl_grad = torch.autograd.grad(kl,  
                                      self.actor.parameters(),  
                                      create_graph=True)  
        kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])  
        kl_grad_vector_product = torch.dot(kl_grad_vector, vector)  
        grad2 = torch.autograd.grad(kl_grad_vector_product,  
                                    self.actor.parameters())  
        grad2_vector = torch.cat(  
            [grad.contiguous().view(-1) for grad in grad2])  
        return grad2_vector + damping * vector  

    def conjugate_gradient(self, grad, states, old_action_dists):  
        x = torch.zeros_like(grad)  
        r = grad.clone()  
        p = grad.clone()  
        rdotr = torch.dot(r, r)  
        for i in range(10):  
            Hp = self.hessian_matrix_vector_product(states, old_action_dists,  
                                                    p)  
            alpha = rdotr / torch.dot(p, Hp)  
            x += alpha * p  
            r -= alpha * Hp  
            new_rdotr = torch.dot(r, r)  
            if new_rdotr < 1e-10:  
                break  
            beta = new_rdotr / rdotr  
            p = r + beta * p  
            rdotr = new_rdotr  
        return x  

    def compute_surrogate_obj(self, states, actions, advantage, old_log_probs,  
                              actor):  
        mu, std = actor(states)  
        action_dists = torch.distributions.Normal(mu, std)  
        log_probs = action_dists.log_prob(actions)  
        ratio = torch.exp(log_probs - old_log_probs)  
        return torch.mean(ratio * advantage)  

    def line_search(self, states, actions, advantage, old_log_probs,  
                    old_action_dists, max_vec):  
        old_para = torch.nn.utils.convert_parameters.parameters_to_vector(  
            self.actor.parameters())  
        old_obj = self.compute_surrogate_obj(states, actions, advantage,  
                                             old_log_probs, self.actor)  
        for i in range(15):  
            coef = self.alpha**i  
            new_para = old_para + coef * max_vec  
            new_actor = copy.deepcopy(self.actor)  
            torch.nn.utils.convert_parameters.vector_to_parameters(  
                new_para, new_actor.parameters())  
            mu, std = new_actor(states)  
            new_action_dists = torch.distributions.Normal(mu, std)  
            kl_div = torch.mean(  
                torch.distributions.kl.kl_divergence(old_action_dists,  
                                                     new_action_dists))  
            new_obj = self.compute_surrogate_obj(states, actions, advantage,  
                                                 old_log_probs, new_actor)  
            if new_obj > old_obj and kl_div < self.kl_constraint:  
                return new_para  
        return old_para  

    def policy_learn(self, states, actions, old_action_dists, old_log_probs,  
                     advantage):  
        surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,  
                                                   old_log_probs, self.actor)  
        grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())  
        obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()  
        descent_direction = self.conjugate_gradient(obj_grad, states,  
                                                    old_action_dists)  
        Hd = self.hessian_matrix_vector_product(states, old_action_dists,  
                                                descent_direction)  
        max_coef = torch.sqrt(2 * self.kl_constraint /  
                              (torch.dot(descent_direction, Hd) + 1e-8))  
        new_para = self.line_search(states, actions, advantage, old_log_probs,  
                                    old_action_dists,  
                                    descent_direction * max_coef)  
        torch.nn.utils.convert_parameters.vector_to_parameters(  
            new_para, self.actor.parameters())  

    def update(self, transition_dict):  
        states = torch.tensor(transition_dict['states'],  
                              dtype=torch.float).to(self.device)  
        actions = torch.tensor(transition_dict['actions'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        rewards = torch.tensor(transition_dict['rewards'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        next_states = torch.tensor(transition_dict['next_states'],  
                                   dtype=torch.float).to(self.device)  
        dones = torch.tensor(transition_dict['dones'],  
                             dtype=torch.float).view(-1, 1).to(self.device)  
        rewards = (rewards + 8.0) / 8.0  
        td_target = rewards + self.gamma * self.critic(next_states) * (1 -  
                                                                       dones)  
        td_delta = td_target - self.critic(states)  
        advantage = compute_advantage(self.gamma, self.lmbda,  
                                      td_delta.cpu()).to(self.device)  
        mu, std = self.actor(states)  
        old_action_dists = torch.distributions.Normal(mu.detach(),  
                                                      std.detach())  
        old_log_probs = old_action_dists.log_prob(actions)  
        critic_loss = torch.mean(  
            F.mse_loss(self.critic(states), td_target.detach()))  
        self.critic_optimizer.zero_grad()  
        critic_loss.backward()  
        self.critic_optimizer.step()  
        self.policy_learn(states, actions, old_action_dists, old_log_probs,  
                          advantage)  

# 仅替换rl_utils.train_on_policy_agent(极简版,适配Gymnasium)  
def train_on_policy_agent(env, agent, num_episodes):  
    return_list = []  
    for i in range(10):  
        with tqdm(total=int(num_episodes/10), desc=f'Iteration {i}') as pbar:  
            for i_episode in range(int(num_episodes/10)):  
                episode_return = 0  
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}  
                # 仅修改reset:适配Gymnasium返回值+设置seed  
                state, info = env.reset(seed=0)  
                done = False  
                while not done:  
                    action = agent.take_action(state)  
                    # 仅修改step:适配Gymnasium返回值  
                    next_state, reward, terminated, truncated, info = env.step(action)  
                    done = terminated or truncated  
                    transition_dict['states'].append(state)  
                    transition_dict['actions'].append(action)  
                    transition_dict['next_states'].append(next_state)  
                    transition_dict['rewards'].append(reward)  
                    transition_dict['dones'].append(done)  
                    state = next_state  
                    episode_return += reward  
                return_list.append(episode_return)  
                agent.update(transition_dict)  
                if (i_episode + 1) % 10 == 0:  
                    pbar.set_postfix({'episode': f'{num_episodes/10*i + i_episode+1}', 'return': f'{np.mean(return_list[-10:]):.3f}'})  
                pbar.update(1)  
    return return_list  

# 仅替换rl_utils.moving_average(极简版)  
def moving_average(x, window_size):  
    return np.convolve(x, np.ones(window_size)/window_size, mode='valid')  

# 保留你原代码的超参数和主逻辑,仅删除env.seed(0)  
num_episodes = 2000  
hidden_dim = 128  
gamma = 0.9  
lmbda = 0.9  
critic_lr = 1e-2  
kl_constraint = 0.00005  
alpha = 0.5  
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

env_name = 'Pendulum-v1'  
env = gym.make(env_name)  
# 仅删除:env.seed(0)  
torch.manual_seed(0)  
agent = TRPOContinuous(hidden_dim, env.observation_space, env.action_space,  
                       lmbda, kl_constraint, alpha, critic_lr, gamma, device)  
return_list = train_on_policy_agent(env, agent, num_episodes)  

episodes_list = list(range(len(return_list)))  
plt.plot(episodes_list, return_list)  
plt.xlabel('Episodes')  
plt.ylabel('Returns')  
plt.title('TRPO on {}'.format(env_name))  
plt.show()  

# 适配moving_average输出长度(仅小修改)  
mv_return = moving_average(return_list, 9)  
plt.plot(episodes_list[4:-4], mv_return)  # 仅调整x轴长度匹配  
plt.xlabel('Episodes')  
plt.ylabel('Returns')  
plt.title('TRPO on {}'.format(env_name))  
plt.show()

结果:

PPO 算法

TRPO 算法在很多场景上的应用都很成功,但是我们也发现它的计算过程非常复杂,每一步更新的运算量非常大。
于是,TRPO 算法的改进版——PPO 算法在 2017 年被提出,PPO 基于 TRPO 的思想,但是其算法实现更加简单。并且大量的实验结果表明,与 TRPO 相比,PPO 能学习得一样好(甚至更快),这使得 PPO 成为非常流行的强化学习算法。
具体来说,PPO 有两种形式,一是 PPO- 惩罚,二是 PPO- 截断,我们接下来对这两种形式进行介绍。

PPO 惩罚

PPO- 惩罚(PPO-Penalty)用拉格朗日乘数法直接将 KL 散度的限制放进了目标函数中,这就变成了一个无约束的优化问题,在迭代的过程中不断更新 KL 散度前的系数。即:

\[ \arg\max_\theta\mathbb{E}_{s\sim\nu^{\pi_{\theta_k}}}\mathbb{E}_{a\sim\pi_{\theta_k}(\cdot|s)}\left[\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)}A^{\pi_{\theta_k}}(s,a)-\beta D_{KL}[\pi_{\theta_k}(\cdot|s),\pi_\theta(\cdot|s)]\right] \]

PPO- 截断

PPO 的另一种形式 PPO- 截断(PPO-Clip)更加直接,它在目标函数中进行限制,以保证新的参数和旧的参数的差距不会太大,即:

\[ \arg\max_{\theta}\mathbb{E}_{s\sim\nu}\mathbb{E}_{a\sim\pi_{\theta_k}(\cdot|s)}\left[\min\left(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)}A^{\pi_{\theta_k}}(s,a),\operatorname{clip}\left(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)},1-\epsilon,1+\epsilon\right)A^{\pi_{\theta_k}}(s,a)\right)\right] \]
  • 其中 \(\operatorname{clip}(x,l,r):=\max(\min(x,r),l)\),即把 \(x\) 限制在 \([l,r]\) 内。上式中 \(\epsilon\) 是一个超参数,表示进行截断(clip)的范围。
  • 如果 \(A^{\pi_{\theta_k}}(s,a)>0\),说明这个动作的价值高于平均,最大化这个式子会增大 \(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)}\),但不会让其超过 \(1+\epsilon\)。反之,如果 \(A^{\pi_{\theta_k}}(s,a)<0\),最大化这个式子会减小 \(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)}\),但不会让其超过 \(1-\epsilon\)。如图 12-1 所示。

数学说明

  • 策略网络 (Actor 的更新):

    \[ \mathbb{E}\left[\min\left(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)}A^{\pi_{\theta_k}}(s,a),\mathrm{clip}\left(\frac{\pi_\theta(a|s)}{\pi_{\theta_k}(a|s)},1-\epsilon,1+\epsilon\right)A^{\pi_{\theta_k}}(s,a)\right)\right] \]

    最大化这个函数,也就是由 A(上面有说)的正负更新新策略的概率
    - 价值网络的更新

    \[ \mathcal{L}(\omega)=\frac{1}{2}(V_\omega(s_t)-y_t)^2 \]

    就是使得状态价值函数接近目标函数:\(y_t=r_t+\gamma V_\omega(s_{t+1})\)

代码实践

与 TRPO 相同,我们仍然在车杆和倒立摆两个环境中测试 PPO 算法。大量实验表明,PPO- 截断总是比 PPO- 惩罚表现得更好。因此下面我们专注于 PPO- 截断的代码实现。
倒立摆是与连续动作交互的环境,同 TRPO 算法一样,我们做一些修改,让策略网络输出连续动作高斯分布(Gaussian distribution)的均值和标准差。后续的连续动作则在该高斯分布中采样得到。


Python
import gymnasium as gym  # 仅替换gym为gymnasium  
import torch  
import torch.nn.functional as F  
import numpy as np  
import matplotlib.pyplot as plt  
from tqdm import tqdm  # 仅新增:用于进度条  

# 保留你原代码的ValueNet  
class ValueNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim):  
        super(ValueNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, 1)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return self.fc2(x)  

# 保留你原代码的PolicyNetContinuous  
class PolicyNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(PolicyNetContinuous, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)  
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        mu = 2.0 * torch.tanh(self.fc_mu(x))  
        std = F.softplus(self.fc_std(x))  
        return mu, std  

# 保留你原代码的PPOContinuous  
class PPOContinuous:  
    ''' 处理连续动作的PPO算法 '''    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,  
                 lmbda, epochs, eps, gamma, device):  
        self.actor = PolicyNetContinuous(state_dim, hidden_dim,  
                                         action_dim).to(device)  
        self.critic = ValueNet(state_dim, hidden_dim).to(device)  
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),  
                                                lr=actor_lr)  
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),  
                                                 lr=critic_lr)  
        self.gamma = gamma  
        self.lmbda = lmbda  
        self.epochs = epochs  
        self.eps = eps  
        self.device = device  

    def take_action(self, state):  
        state = torch.tensor([state], dtype=torch.float).to(self.device)  
        mu, sigma = self.actor(state)  
        action_dist = torch.distributions.Normal(mu, sigma)  
        action = action_dist.sample()  
        return [action.item()]  

    def update(self, transition_dict):  
        states = torch.tensor(transition_dict['states'],  
                              dtype=torch.float).to(self.device)  
        actions = torch.tensor(transition_dict['actions'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        rewards = torch.tensor(transition_dict['rewards'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        next_states = torch.tensor(transition_dict['next_states'],  
                                   dtype=torch.float).to(self.device)  
        dones = torch.tensor(transition_dict['dones'],  
                             dtype=torch.float).view(-1, 1).to(self.device)  
        rewards = (rewards + 8.0) / 8.0  # 和TRPO一样,对奖励进行修改,方便训练  
        td_target = rewards + self.gamma * self.critic(next_states) * (1 -  
                                                                       dones)  
        td_delta = td_target - self.critic(states)  
        # 仅替换:调用自定义的compute_advantage(替代rl_utils)  
        advantage = compute_advantage(self.gamma, self.lmbda,  
                                               td_delta.cpu()).to(self.device)  
        mu, std = self.actor(states)  
        action_dists = torch.distributions.Normal(mu.detach(), std.detach())  
        # 动作是正态分布  
        old_log_probs = action_dists.log_prob(actions)  

        for _ in range(self.epochs):  
            mu, std = self.actor(states)  
            action_dists = torch.distributions.Normal(mu, std)  
            log_probs = action_dists.log_prob(actions)  
            ratio = torch.exp(log_probs - old_log_probs)  
            surr1 = ratio * advantage  
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  
            actor_loss = torch.mean(-torch.min(surr1, surr2))  
            critic_loss = torch.mean(  
                F.mse_loss(self.critic(states), td_target.detach()))  
            self.actor_optimizer.zero_grad()  
            self.critic_optimizer.zero_grad()  
            actor_loss.backward()  
            critic_loss.backward()  
            self.actor_optimizer.step()  
            self.critic_optimizer.step()  

# 仅新增:补全rl_utils.compute_advantage(极简版)  
def compute_advantage(gamma, lmbda, td_delta):  
    td_delta = td_delta.detach().numpy()  
    advantage_list = []  
    advantage = 0.0  
    for delta in reversed(td_delta):  
        advantage = gamma * lmbda * advantage + delta  
        advantage_list.append(advantage)  
    advantage_list.reverse()  
    return torch.tensor(advantage_list, dtype=torch.float)  

# 仅新增:补全rl_utils.train_on_policy_agent(极简版,适配gymnasium)  
def train_on_policy_agent(env, agent, num_episodes):  
    return_list = []  
    for i in range(10):  
        with tqdm(total=int(num_episodes/10), desc=f'Iteration {i}') as pbar:  
            for i_episode in range(int(num_episodes/10)):  
                episode_return = 0  
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}  
                # 仅修改:适配gymnasium的reset返回值+设置seed  
                state, info = env.reset(seed=0)  
                done = False  
                while not done:  
                    action = agent.take_action(state)  
                    # 仅修改:适配gymnasium的step返回值  
                    next_state, reward, terminated, truncated, info = env.step(action)  
                    done = terminated or truncated  
                    transition_dict['states'].append(state)  
                    transition_dict['actions'].append(action)  
                    transition_dict['next_states'].append(next_state)  
                    transition_dict['rewards'].append(reward)  
                    transition_dict['dones'].append(done)  
                    state = next_state  
                    episode_return += reward  
                return_list.append(episode_return)  
                agent.update(transition_dict)  
                if (i_episode + 1) % 10 == 0:  
                    pbar.set_postfix({'episode': f'{num_episodes/10*i + i_episode+1}', 'return': f'{np.mean(return_list[-10:]):.3f}'})  
                pbar.update(1)  
    return return_list  

# 保留你原代码的超参数(仅修改env_name)  
actor_lr = 1e-4  
critic_lr = 5e-3  
num_episodes = 2000  
hidden_dim = 128  
gamma = 0.9  
lmbda = 0.9  
epochs = 10  
eps = 0.2  
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

# 仅修改:v0 → v1(解决废弃报错)  
env_name = 'Pendulum-v1'  
env = gym.make(env_name)  
# 仅删除:env.seed(0)(已废弃)  
torch.manual_seed(0)  
state_dim = env.observation_space.shape[0]  
action_dim = env.action_space.shape[0]  # 连续动作空间  
agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr,  
                      lmbda, epochs, eps, gamma, device)  

# 调用自定义的train_on_policy_agent(替代rl_utils)  
return_list = train_on_policy_agent(env, agent, num_episodes)  

# 保留你原代码的绘图逻辑  
episodes_list = list(range(len(return_list)))  
plt.plot(episodes_list, return_list)  
plt.xlabel('Episodes')  
plt.ylabel('Returns')  
plt.title('PPO on {}'.format(env_name))  
plt.show()

结果:

分析
  • 前期(0–500 回合)
    • 奖励从约 -1400 快速提升到 -200 左右,收敛速度非常快。
    • 这说明 PPO 的裁剪目标函数(clipped surrogate objective)能高效地引导策略更新,让智能体在极短时间内就掌握了让摆杆稳定倒立的核心策略。
  • 中期(500–2000 回合)
    • 大部分回合的奖励稳定在 -200 左右,虽然仍有波动,但波动幅度远小于 TRPO。
    • 即使出现波动,奖励也很少跌回 -800 以下,说明策略的鲁棒性更强,不容易 “退化” 到失控状态。

运行的时间很短,得到的结果也是不错的

总结:

PPO 是 TRPO 的一种改进算法,它在实现上简化了 TRPO 中的复杂计算,并且它在实验中的性能大多数情况下会比 TRPO 更好,因此目前常被用作一种常用的基准算法
需要注意的是,TRPO 和 PPO 都属于在线策略学习算法,即使优化目标中包含重要性采样的过程,但其只是用到了上一轮策略的数据,而不是过去所有策略的数据。

SAC 算法

  • 之前的章节提到过在线策略算法的采样效率比较低,我们通常更倾向于使用离线策略算法。
  • 2018 年,一个更加稳定的离线策略算法 Soft Actor-Critic(SAC)被提出。SAC 的前身是 Soft Q-learning,它们都属于最大熵强化学习的范畴。
  • 目前,在无模型的强化学习算法中,SAC 是一个非常高效的算法,它学习一个随机性策略,在不少标准环境中取得了领先的成绩。

代码实践

Python
import random  
import gymnasium as gym  # 仅替换gym为gymnasium  
import numpy as np  
from tqdm import tqdm  
import torch  
import torch.nn.functional as F  
from torch.distributions import Normal  
import matplotlib.pyplot as plt  

# 仅新增:补全rl_utils.ReplayBuffer(极简版,适配off-policy)  
class ReplayBuffer:  
    def __init__(self, capacity):  
        self.capacity = capacity  
        self.buffer = []  
        self.position = 0  

    def push(self, state, action, reward, next_state, done):  
        if len(self.buffer) < self.capacity:  
            self.buffer.append(None)  
        self.buffer[self.position] = (state, action, reward, next_state, done)  
        self.position = (self.position + 1) % self.capacity  

    def sample(self, batch_size):  
        batch = random.sample(self.buffer, batch_size)  
        state, action, reward, next_state, done = zip(*batch)  
        return state, action, reward, next_state, done  

    def __len__(self):  
        return len(self.buffer)  

# 仅新增:补全rl_utils.train_off_policy_agent(极简版,适配SAC)  
def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size):  
    return_list = []  
    with tqdm(total=num_episodes, desc="Training") as pbar:  
        for i_episode in range(num_episodes):  
            episode_return = 0  
            state, info = env.reset(seed=0)  # 仅适配gymnasium的reset返回值  
            done = False  
            while not done:  
                action = agent.take_action(state)  
                next_state, reward, terminated, truncated, info = env.step(action)  # 适配gymnasium的step返回值  
                done = terminated or truncated  
                replay_buffer.push(state, action, reward, next_state, done)  
                state = next_state  
                episode_return += reward  
                # 当缓冲区数据量足够时,更新策略  
                if len(replay_buffer) > minimal_size:  
                    b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)  
                    transition_dict = {  
                        'states': b_s,  
                        'actions': b_a,  
                        'next_states': b_ns,  
                        'rewards': b_r,  
                        'dones': b_d  
                    }  
                    agent.update(transition_dict)  
            return_list.append(episode_return)  
            # 进度条更新  
            if (i_episode + 1) % 10 == 0:  
                pbar.set_postfix({'episode': f'{i_episode+1}', 'avg_return': f'{np.mean(return_list[-10:]):.3f}'})  
            pbar.update(1)  
    return return_list  

# 保留你原代码的PolicyNetContinuous  
class PolicyNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound):  
        super(PolicyNetContinuous, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)  
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)  
        self.action_bound = action_bound  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        mu = self.fc_mu(x)  
        std = F.softplus(self.fc_std(x))  
        dist = Normal(mu, std)  
        normal_sample = dist.rsample()  # rsample()是重参数化采样  
        log_prob = dist.log_prob(normal_sample)  
        action = torch.tanh(normal_sample)  
        # 计算tanh_normal分布的对数概率密度  
        log_prob = log_prob - torch.log(1 - torch.tanh(action).pow(2) + 1e-7)  
        action = action * self.action_bound  
        return action, log_prob  

# 保留你原代码的QValueNetContinuous  
class QValueNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(QValueNetContinuous, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)  
        self.fc_out = torch.nn.Linear(hidden_dim, 1)  

    def forward(self, x, a):  
        cat = torch.cat([x, a], dim=1)  
        x = F.relu(self.fc1(cat))  
        x = F.relu(self.fc2(x))  
        return self.fc_out(x)  

# 保留你原代码的SACContinuous  
class SACContinuous:  
    ''' 处理连续动作的SAC算法 '''    def __init__(self, state_dim, hidden_dim, action_dim, action_bound,  
                 actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma,  
                 device):  
        self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim,  
                                         action_bound).to(device)  # 策略网络  
        self.critic_1 = QValueNetContinuous(state_dim, hidden_dim,  
                                            action_dim).to(device)  # 第一个Q网络  
        self.critic_2 = QValueNetContinuous(state_dim, hidden_dim,  
                                            action_dim).to(device)  # 第二个Q网络  
        self.target_critic_1 = QValueNetContinuous(state_dim,  
                                                   hidden_dim, action_dim).to(  
                                                       device)  # 第一个目标Q网络  
        self.target_critic_2 = QValueNetContinuous(state_dim,  
                                                   hidden_dim, action_dim).to(  
                                                       device)  # 第二个目标Q网络  
        # 令目标Q网络的初始参数和Q网络一样  
        self.target_critic_1.load_state_dict(self.critic_1.state_dict())  
        self.target_critic_2.load_state_dict(self.critic_2.state_dict())  
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),  
                                                lr=actor_lr)  
        self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(),  
                                                   lr=critic_lr)  
        self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(),  
                                                   lr=critic_lr)  
        # 使用alpha的log值,可以使训练结果比较稳定  
        self.log_alpha = torch.tensor(np.log(0.01), dtype=torch.float)  
        self.log_alpha.requires_grad = True  # 可以对alpha求梯度  
        self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha],  
                                                    lr=alpha_lr)  
        self.target_entropy = target_entropy  # 目标熵的大小  
        self.gamma = gamma  
        self.tau = tau  
        self.device = device  

    def take_action(self, state):  
        state = torch.tensor([state], dtype=torch.float).to(self.device)  
        action = self.actor(state)[0]  
        return [action.item()]  

    def calc_target(self, rewards, next_states, dones):  # 计算目标Q值  
        next_actions, log_prob = self.actor(next_states)  
        entropy = -log_prob  
        q1_value = self.target_critic_1(next_states, next_actions)  
        q2_value = self.target_critic_2(next_states, next_actions)  
        next_value = torch.min(q1_value,  
                               q2_value) + self.log_alpha.exp() * entropy  
        td_target = rewards + self.gamma * next_value * (1 - dones)  
        return td_target  

    def soft_update(self, net, target_net):  
        for param_target, param in zip(target_net.parameters(),  
                                       net.parameters()):  
            param_target.data.copy_(param_target.data * (1.0 - self.tau) +  
                                    param.data * self.tau)  

    def update(self, transition_dict):  
        states = torch.tensor(transition_dict['states'],  
                              dtype=torch.float).to(self.device)  
        actions = torch.tensor(transition_dict['actions'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        rewards = torch.tensor(transition_dict['rewards'],  
                               dtype=torch.float).view(-1, 1).to(self.device)  
        next_states = torch.tensor(transition_dict['next_states'],  
                                   dtype=torch.float).to(self.device)  
        dones = torch.tensor(transition_dict['dones'],  
                             dtype=torch.float).view(-1, 1).to(self.device)  
        # 和之前章节一样,对倒立摆环境的奖励进行重塑以便训练  
        rewards = (rewards + 8.0) / 8.0  

        # 更新两个Q网络  
        td_target = self.calc_target(rewards, next_states, dones)  
        critic_1_loss = torch.mean(  
            F.mse_loss(self.critic_1(states, actions), td_target.detach()))  
        critic_2_loss = torch.mean(  
            F.mse_loss(self.critic_2(states, actions), td_target.detach()))  
        self.critic_1_optimizer.zero_grad()  
        critic_1_loss.backward()  
        self.critic_1_optimizer.step()  
        self.critic_2_optimizer.zero_grad()  
        critic_2_loss.backward()  
        self.critic_2_optimizer.step()  

        # 更新策略网络  
        new_actions, log_prob = self.actor(states)  
        entropy = -log_prob  
        q1_value = self.critic_1(states, new_actions)  
        q2_value = self.critic_2(states, new_actions)  
        actor_loss = torch.mean(-self.log_alpha.exp() * entropy -  
                                torch.min(q1_value, q2_value))  
        self.actor_optimizer.zero_grad()  
        actor_loss.backward()  
        self.actor_optimizer.step()  

        # 更新alpha值  
        alpha_loss = torch.mean(  
            (entropy - self.target_entropy).detach() * self.log_alpha.exp())  
        self.log_alpha_optimizer.zero_grad()  
        alpha_loss.backward()  
        self.log_alpha_optimizer.step()  

        self.soft_update(self.critic_1, self.target_critic_1)  
        self.soft_update(self.critic_2, self.target_critic_2)  

# 仅修改:v0 → v1(解决废弃报错)  
env_name = 'Pendulum-v1'  
env = gym.make(env_name)  
state_dim = env.observation_space.shape[0]  
action_dim = env.action_space.shape[0]  
action_bound = env.action_space.high[0]  # 动作最大值  
random.seed(0)  
np.random.seed(0)  
# 仅删除:env.seed(0)(已废弃)  
torch.manual_seed(0)  

# 保留你原代码的超参数  
actor_lr = 3e-4  
critic_lr = 3e-3  
alpha_lr = 3e-4  
num_episodes = 100  
hidden_dim = 128  
gamma = 0.99  
tau = 0.005  # 软更新参数  
buffer_size = 100000  
minimal_size = 1000  
batch_size = 64  
target_entropy = -env.action_space.shape[0]  
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(  
    "cpu")  

# 调用自定义的ReplayBuffer(替代rl_utils)  
replay_buffer = ReplayBuffer(buffer_size)  
agent = SACContinuous(state_dim, hidden_dim, action_dim, action_bound,  
                      actor_lr, critic_lr, alpha_lr, target_entropy, tau,  
                      gamma, device)  

# 调用自定义的train_off_policy_agent(替代rl_utils)  
return_list = train_off_policy_agent(env, agent, num_episodes,  
                                              replay_buffer, minimal_size,  
                                              batch_size)  

# 新增:绘图(可选,方便看结果)  
episodes_list = list(range(len(return_list)))  
plt.plot(episodes_list, return_list)  
plt.xlabel('Episodes')  
plt.ylabel('Returns')  
plt.title('SAC on {}'.format(env_name))  
plt.show()

运行的时间比较长
结果:

PPO 算法使用

前文的代码解释

1. 导入依赖库(代码最开头)

Python
1
2
3
4
5
6
import gymnasium as gym  
import torch  
import torch.nn.functional as F  
import numpy as np  
import matplotlib.pyplot as plt  
from tqdm import tqdm  
  • gymnasium as gym:创建强化学习环境(这里是 Pendulum-v1 倒立摆),替代旧版 gym,适配新版接口;
  • torch:PyTorch 深度学习框架,用来构建神经网络、计算梯度、更新参数;
  • torch.nn.functional as F:提供 ReLU、MSE 损失等常用函数;
  • numpy:数值计算,处理状态 / 奖励等数组数据;
  • matplotlib.pyplot:绘制训练奖励曲线,直观展示训练效果;
  • tqdm:显示训练进度条,方便看训练进度(比如跑了多少回合)。

2. 价值网络(ValueNet)

Python
1
2
3
4
5
6
7
8
9
class ValueNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim):  
        super(ValueNet, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc2 = torch.nn.Linear(hidden_dim, 1)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        return self.fc2(x)  

估计「状态价值」—— 输入当前环境状态(比如倒立摆的角度、角速度),输出这个状态能获得的累计奖励期望(简单说:判断当前状态 “好不好”);

3. 连续动作策略网络(PolicyNetContinuous)

Python
class PolicyNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        super(PolicyNetContinuous, self).__init__()  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)  
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)  

    def forward(self, x):  
        x = F.relu(self.fc1(x))  
        mu = 2.0 * torch.tanh(self.fc_mu(x))  
        std = F.softplus(self.fc_std(x))  
        return mu, std  
  • 输出连续动作的分布参数 ——Pendulum-v1 的动作是 “扭矩”(连续值,范围 [-2,2]),网络输出正态分布的均值 (mu)标准差 (std),后续用这个分布采样动作;

4. PPO 核心类(PPOContinuous)

这是代码的核心,负责策略的 “决策” 和 “更新”,分 3 个关键方法:

  • 初始化:
Python
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):  
    self.actor = PolicyNetContinuous(...)  # 策略网络(输出动作)
    self.critic = ValueNet(...)  # 价值网络(估计状态价值)
    self.actor_optimizer = torch.optim.Adam(...)  # 策略网络优化器(更新参数)
    self.critic_optimizer = torch.optim.Adam(...)  # 价值网络优化器
    # 超参数:控制训练过程
    self.gamma = gamma  # 折扣因子(未来奖励的权重)
    self.lmbda = lmbda  # GAE优势估计的参数
    self.epochs = epochs  # 每个回合更新策略的轮数
    self.eps = eps  # PPO裁剪系数(核心!限制策略更新幅度)
    self.device = device  # 训练设备(CPU/GPU)

核心:初始化 “策略网络(Actor)” 和 “价值网络(Critic)”,以及对应的优化器,设置 PPO 的核心超参数。

  • take_action:决策(选动作)
Python
1
2
3
4
5
6
def take_action(self, state):  
    state = torch.tensor([state], dtype=torch.float).to(self.device)  
    mu, sigma = self.actor(state)  
    action_dist = torch.distributions.Normal(mu, sigma)  # 构建正态分布
    action = action_dist.sample()  # 从分布中采样动作
    return [action.item()]  
  • 核心作用:输入当前状态,输出具体的动作(比如给倒立摆的扭矩值);
  • 关键:用正态分布采样动作,而非直接取均值 —— 加入随机性,让智能体 “探索” 不同动作,避免过早陷入局部最优。

  • update:策略更新(PPO 核心逻辑)
Python
def update(self, transition_dict):  
    # 1. 数据预处理:把收集的经验转成Tensor,放到指定设备
    states = torch.tensor(transition_dict['states']).to(device)  
    actions = torch.tensor(transition_dict['actions']).to(device)  
    rewards = torch.tensor(transition_dict['rewards']).to(device)  
    next_states = torch.tensor(transition_dict['next_states']).to(device)  
    dones = torch.tensor(transition_dict['dones']).to(device)  

    # 2. 奖励重塑:Pendulum原始奖励是负数(-16~0),重塑后更易训练
    rewards = (rewards + 8.0) / 8.0  

    # 3. 计算TD目标和优势函数(判断动作“好不好”)
    td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)  # 目标价值
    td_delta = td_target - self.critic(states)  # TD误差
    advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(device)  # GAE优势估计

    # 4. 计算旧策略的对数概率(PPO的关键:和新策略对比)
    mu, std = self.actor(states)  
    action_dists = torch.distributions.Normal(mu.detach(), std.detach())  
    old_log_probs = action_dists.log_prob(actions)  

    # 5. 多轮更新策略(PPO核心:裁剪,保证训练稳定)
    for _ in range(self.epochs):  
        # 重新计算新策略的概率
        mu, std = self.actor(states)  
        action_dists = torch.distributions.Normal(mu, std)  
        log_probs = action_dists.log_prob(actions)  

        # 计算策略更新的比率(新策略/旧策略)
        ratio = torch.exp(log_probs - old_log_probs)  
        # PPO裁剪:限制ratio在[1-eps, 1+eps]之间,避免策略更新幅度过大
        surr1 = ratio * advantage  
        surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  

        # 6. 计算损失并更新
        actor_loss = torch.mean(-torch.min(surr1, surr2))  # 策略损失(越小越好)
        critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))  # 价值损失

        # 梯度清零→反向传播→更新参数
        self.actor_optimizer.zero_grad()  
        self.critic_optimizer.zero_grad()  
        actor_loss.backward()  
        critic_loss.backward()  
        self.actor_optimizer.step()  
        self.critic_optimizer.step()  

核心作用:用一个回合收集的经验(状态、动作、奖励等)更新策略网络和价值网络;
PPO 的灵魂:torch.clamp(ratio, 1 - eps, 1 + eps) —— 把策略更新的比率限制在小范围内,避免 “一次更新太猛导致训练崩”,这也是 PPO 比其他算法稳定的关键。

辅助函数:

  1. compute_advantage(GAE 优势估计)
    • 核心作用:计算「优势值」—— 判断 “当前动作比平均水平好多少”,让策略更新更精准,减少训练方差;
    • 简单说:优势值为正→这个动作该多做,为负→该少做。
  2. train_on_policy_agent(训练循环)
    • 核心作用:控制整个训练流程 —— 循环多个回合,每个回合和环境交互收集经验,回合结束后调用 agent.update() 更新策略,最后返回所有回合的奖励;
    • 关键:PPO 是 On-Policy 算法(只能用 “当前策略收集的经验” 更新),所以每个回合都要重新收集经验,不能复用历史数据。

超参数设置 + 主流程

Python
# 超参数(控制训练效果)
actor_lr = 1e-4  # 策略网络学习率
critic_lr = 5e-3  # 价值网络学习率
num_episodes = 2000  # 总训练回合数
hidden_dim = 128  # 网络隐藏层维度
gamma = 0.9  # 折扣因子
lmbda = 0.9  # GAE参数
epochs = 10  # 每个回合更新轮数
eps = 0.2  # PPO裁剪系数
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

# 创建环境 + 初始化智能体
env_name = 'Pendulum-v1'  
env = gym.make(env_name)  
torch.manual_seed(0)  # 固定随机种子,结果可复现
state_dim = env.observation_space.shape[0]  # 状态维度(3)
action_dim = env.action_space.shape[0]  # 动作维度(1)
agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)  

# 开始训练
return_list = train_on_policy_agent(env, agent, num_episodes)  

# 绘图:展示奖励曲线
episodes_list = list(range(len(return_list)))  
plt.plot(episodes_list, return_list)  
plt.xlabel('Episodes')  
plt.ylabel('Returns')  
plt.title('PPO on {}'.format(env_name))  
plt.show()  
  • 核心:设置超参数→创建环境和智能体→启动训练→绘制奖励曲线,直观看到训练效果(奖励从负数逐步上升,越接近 0 越好)。

语言描述

应用到电机控制节能中

代码

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# 核心目标:通过强化学习PPO算法,控制电机扭矩,在保证动力性的前提下最小化能耗  
# 依赖库说明:  
#   torch: 深度学习框架,用于构建神经网络和梯度计算  
#   torch.nn.functional: 神经网络常用激活函数、损失函数  
#   numpy: 数值计算,处理环境状态/动作的数组运算  
#   matplotlib: 绘图,展示奖励和能耗的收敛曲线  
#   tqdm: 进度条,可视化训练过程  
# ==============================================================================  
import torch  
import torch.nn.functional as F  
import numpy as np  
import matplotlib.pyplot as plt  
from tqdm import tqdm  

# ===================== 1. 价值网络(Critic):评估状态价值 =====================# 作用:输入电机状态(车速、SOC、转速、扭矩),输出该状态的价值(未来奖励的期望)  
# 结构:简单的全连接网络,2层(输入层→隐藏层→输出层)  
class ValueNet(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim):  
        """  
        初始化价值网络  
        :param state_dim: 状态维度(电机环境为4维:车速、SOC、转速、扭矩)  
        :param hidden_dim: 隐藏层维度(经验值128,平衡性能和计算量)  
        """        super(ValueNet, self).__init__()  
        # 第一层:输入层→隐藏层(线性变换)  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        # 第二层:隐藏层→输出层(输出状态价值,标量)  
        self.fc2 = torch.nn.Linear(hidden_dim, 1)  

    def forward(self, x):  
        """  
        前向传播:计算状态价值  
        :param x: 输入状态 tensor,shape=(batch_size, state_dim)  
        :return: 状态价值 tensor,shape=(batch_size, 1)  
        """        # 隐藏层激活:ReLU(避免梯度消失,引入非线性)  
        x = F.relu(self.fc1(x))  
        # 输出层无激活(价值可以是任意实数)  
        return self.fc2(x)  

# ===================== 2. 策略网络(Actor):输出连续动作(扭矩) =====================# 作用:输入电机状态,输出扭矩的均值(mu)和标准差(std),用于构建正态分布采样动作  
# 优化点:动态探索衰减(后期减少高扭矩尝试,避免SOC耗尽)  
class PolicyNetContinuous(torch.nn.Module):  
    def __init__(self, state_dim, hidden_dim, action_dim):  
        """  
        初始化连续动作策略网络  
        :param state_dim: 状态维度(4维)  
        :param hidden_dim: 隐藏层维度(128)  
        :param action_dim: 动作维度(1维:扭矩指令)  
        """        super(PolicyNetContinuous, self).__init__()  
        # 第一层:输入层→隐藏层  
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)  
        # 第二层:隐藏层→动作均值(mu)  
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)  
        # 第二层:隐藏层→动作标准差(std)  
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)  
        self.action_bound = 100  # 电机物理扭矩上限(N·m)  

        # 参数初始化:正态分布初始化(均值0,方差0.01),避免初始值过大导致梯度爆炸  
        torch.nn.init.normal_(self.fc1.weight, 0, 0.01)  
        torch.nn.init.normal_(self.fc_mu.weight, 0, 0.01)  
        torch.nn.init.normal_(self.fc_std.weight, 0, 0.01)  

    def forward(self, x, explore_decay=1.0):  
        """  
        前向传播:输出扭矩的均值和标准差  
        :param x: 输入状态 tensor,shape=(batch_size, state_dim)  
        :param explore_decay: 探索衰减系数(1.0→0.1,后期减少探索)  
        :return mu: 扭矩均值 tensor,shape=(batch_size, action_dim)  
        :return std: 扭矩标准差 tensor,shape=(batch_size, action_dim)  
        """        # 隐藏层激活  
        x = F.relu(self.fc1(x))  
        # 均值输出:tanh限制在[-1,1],再乘扭矩上限→[-100,100]  
        mu = self.action_bound * torch.tanh(self.fc_mu(x))  
        # 二次裁剪:确保扭矩不超出物理限制  
        mu = torch.clamp(mu, -self.action_bound, self.action_bound)  
        # 标准差输出:  
        # 1. softplus确保std≥0(标准差不能为负)  
        # 2. +0.3保证最小探索性,避免std=0导致采样失败  
        # 3. ×explore_decay:动态衰减探索性,后期减少高扭矩尝试  
        std = (F.softplus(self.fc_std(x)) + 0.3) * explore_decay  
        return mu, std  

# ===================== 3. PPO算法核心类(连续动作版) =====================# 作用:整合策略网络/价值网络,实现动作采样、策略更新(PPO核心逻辑)  
# 优化点:动态探索衰减、奖励裁剪、奖励归一化、梯度裁剪(提升稳定性)  
class PPOContinuous:  
    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,  
                 lmbda, epochs, eps, gamma, device):  
        """  
        初始化PPO智能体  
        :param state_dim: 状态维度(4)  
        :param hidden_dim: 隐藏层维度(128)  
        :param action_dim: 动作维度(1)  
        :param actor_lr: 策略网络学习率(8e-5,放缓学习避免震荡)  
        :param critic_lr: 价值网络学习率(1.5e-3)  
        :param lmbda: GAE优势函数系数(0.92,平衡偏差和方差)  
        :param epochs: 每回合更新轮数(12,减少过拟合)  
        :param eps: PPO裁剪系数(0.2,限制策略更新幅度)  
        :param gamma: 折扣因子(0.97,平衡当下/未来奖励)  
        :param device: 计算设备(GPU/CPU)  
        """        # 初始化策略网络(Actor)  
        self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim).to(device)  
        # 初始化价值网络(Critic)  
        self.critic = ValueNet(state_dim, hidden_dim).to(device)  
        # 策略网络优化器(Adam,常用梯度下降算法)  
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)  
        # 价值网络优化器  
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)  
        self.gamma = gamma      # 折扣因子:未来奖励的衰减系数  
        self.lmbda = lmbda      # GAE系数:优势函数的平滑系数  
        self.epochs = epochs    # 每回合更新轮数:同一批数据训练多轮  
        self.eps = eps          # PPO裁剪系数:避免策略更新幅度过大  
        self.device = device    # 计算设备  
        self.explore_decay = 1.0  # 探索衰减系数(初始1.0)  
        self.decay_rate = 0.9995  # 每回合衰减率(最终稳定在0.1)  

    def take_action(self, state):  
        """  
        采样动作:输入当前状态,输出扭矩指令  
        :param state: 环境状态(numpy数组,4维)  
        :return: 扭矩动作(列表,长度1)  
        """        # 转换为tensor:  
        # 1. numpy→tensor,float32精度  
        # 2. unsqueeze(0):增加batch维度((4,)→(1,4))  
        # 3. to(device):移到GPU/CPU  
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)  
        # 前向传播:获取扭矩均值和标准差(传入探索衰减系数)  
        mu, sigma = self.actor(state, self.explore_decay)  
        # 构建正态分布:用于采样动作  
        action_dist = torch.distributions.Normal(mu, sigma)  
        # 采样动作  
        action = action_dist.sample()  
        # 裁剪动作:确保扭矩在[-80,80](实际执行的扭矩上限,比物理上限更保守)  
        action = torch.clamp(action, -80, 80)  
        # tensor→数值,返回列表(适配环境step接口)  
        return [action.item()]  

    def update(self, transition_dict):  
        """  
        策略更新:使用一回合的经验数据更新网络  
        :param transition_dict: 经验字典,包含states/actions/rewards等  
        """        # ===================== 1. 数据预处理 =====================        # 转换为tensor并移到设备,统一shape:  
        # states: (step_num, 4) → 每步的状态  
        # actions: (step_num, 1) → 每步的动作  
        # rewards: (step_num, 1) → 每步的奖励  
        # next_states: (step_num, 4) → 每步的下一个状态  
        # dones: (step_num, 1) → 每步是否结束  
        states = torch.tensor(np.array(transition_dict['states']), dtype=torch.float32).to(self.device)  
        actions = torch.tensor(np.array(transition_dict['actions']), dtype=torch.float32).view(-1, 1).to(self.device)  
        rewards = torch.tensor(np.array(transition_dict['rewards']), dtype=torch.float32).view(-1, 1).to(self.device)  
        next_states = torch.tensor(np.array(transition_dict['next_states']), dtype=torch.float32).to(self.device)  
        dones = torch.tensor(np.array(transition_dict['dones']), dtype=torch.float32).view(-1, 1).to(self.device)  

        # ===================== 2. 奖励预处理(提升稳定性) =====================        # 优化1:奖励裁剪(-200~200)→ 防止单步奖励突变导致策略崩溃  
        rewards = torch.clamp(rewards, -200, 200)  
        # 优化2:奖励归一化 → 消除数值尺度差异,加速梯度收敛  
        # +1e-8:避免分母为0  
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-8)  

        # ===================== 3. 计算TD目标和优势函数 =====================        # TD目标:当前奖励 + 折扣×下一个状态价值(非结束状态)  
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)  
        # TD误差:目标价值 - 当前状态价值  
        td_delta = td_target - self.critic(states)  
        # 优势函数(GAE):衡量动作的“好坏”,比单纯TD误差更平滑  
        advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)  

        # ===================== 4. 计算旧策略概率(固定,用于PPO裁剪) =====================        mu, std = self.actor(states)  
        # 旧策略分布(detach:固定旧策略,不参与梯度更新)  
        action_dists = torch.distributions.Normal(mu.detach(), std.detach())  
        # 旧策略下动作的对数概率  
        old_log_probs = action_dists.log_prob(actions)  

        # ===================== 5. 多轮更新策略(PPO核心) =====================        for _ in range(self.epochs):  
            # 新策略概率  
            mu, std = self.actor(states)  
            action_dists = torch.distributions.Normal(mu, std)  
            log_probs = action_dists.log_prob(actions)  
            # 策略比率:新策略概率/旧策略概率(指数形式避免数值下溢)  
            ratio = torch.exp(log_probs - old_log_probs)  
            # PPO裁剪损失:  
            surr1 = ratio * advantage  # 未裁剪损失  
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  # 裁剪后损失  
            # 策略损失:最小化负的最小损失(等价于最大化优势)  
            actor_loss = torch.mean(-torch.min(surr1, surr2))  
            # 价值损失:均方误差(预测价值 vs 目标价值)  
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))  

            # ===================== 6. 梯度更新 =====================            # 策略网络更新:  
            self.actor_optimizer.zero_grad()  # 梯度清零(避免累积)  
            actor_loss.backward()             # 反向传播计算梯度  
            torch.nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=1.0)  # 梯度裁剪(防止爆炸)  
            self.actor_optimizer.step()       # 更新参数  

            # 价值网络更新:  
            self.critic_optimizer.zero_grad()  
            critic_loss.backward()  
            torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=1.0)  
            self.critic_optimizer.step()  

        # ===================== 7. 探索衰减(每回合更新后) =====================        # 衰减探索系数,最小为0.1(保留基础探索性)  
        self.explore_decay = max(0.1, self.explore_decay * self.decay_rate)  

# ===================== 4. 优势函数计算(GAE) =====================# 作用:计算广义优势估计(GAE),让优势函数更平滑,提升训练稳定性  
def compute_advantage(gamma, lmbda, td_delta):  
    """  
    计算GAE优势函数  
    :param gamma: 折扣因子  
    :param lmbda: GAE系数  
    :param td_delta: TD误差(numpy数组)  
    :return: 优势函数(tensor)  
    """    td_delta = td_delta.detach().numpy()  # tensor→numpy(方便逆序遍历)  
    advantage_list = []  # 存储每步的优势  
    advantage = 0.0      # 累计优势  
    # 逆序遍历(从最后一步到第一步)  
    for delta in reversed(td_delta):  
        # GAE公式:advantage_t = delta_t + gamma*lmbda*advantage_{t+1}  
        advantage = gamma * lmbda * advantage + delta  
        advantage_list.append(advantage)  
    # 正序排列(恢复原顺序)  
    advantage_list.reverse()  
    # numpy→tensor(float32精度)  
    return torch.tensor(advantage_list, dtype=torch.float32)  

# ===================== 5. 电机控制环境(核心:模拟电机运行) =====================# 作用:模拟电机的物理运行,包括状态更新、能耗计算、奖励计算、终止条件  
# 优化点:SOC缓冲、能耗系数下调、扭矩上限收紧、SOC低时能耗翻倍(防止崩盘)  
class SimpleMotorControlEnv:  
    def __init__(self):  
        """初始化电机环境参数"""  
        self.state_dim = 4          # 状态维度:[车速, SOC, 转速, 扭矩]  
        self.action_dim = 1         # 动作维度:[扭矩指令]  
        self.max_torque = 100       # 电机物理最大扭矩(N·m)  
        self.max_speed = 80         # 最高车速(km/h)  
        self.max_soc = 1.0          # 电池最大SOC(100%)  
        self.min_soc = 0.2          # SOC保护阈值(低于此值开始惩罚)  
        self.base_energy_consume = 0.0008  # 基础能耗系数(下调避免能耗失控)  

        # 电机效率曲线(核心设计):  
        # 扭矩越小,效率越高(符合实际电机特性)  
        # 公式:0.85 + 0.1*e^(-|扭矩|/50) → 扭矩0时效率0.95,扭矩100时效率≈0.85  
        self.efficiency = lambda torque: 0.85 + 0.1 * np.exp(-abs(torque) / 50)  

    def reset(self, seed=0):  
        """  
        重置环境(每回合开始时调用)  
        :param seed: 随机种子(保证结果可复现)  
        :return: 初始状态,空字典(适配gym接口)  
        """        np.random.seed(seed)                # 固定随机种子  
        self.speed = 10.0                   # 初始车速:10km/h(避免怠速)  
        self.soc = self.max_soc             # 初始SOC:100%  
        self.motor_rpm = self.speed * 10    # 初始转速:车速×10(简化映射)  
        self.motor_torque = 0.0             # 初始扭矩:0N·m  
        self.step_count = 0                 # 步数计数(限制最大步数)  
        # 初始状态(float32精度,避免数值类型问题)  
        state = np.array([self.speed, self.soc, self.motor_rpm, self.motor_torque], dtype=np.float32)  
        return state, {}  

    def step(self, action):  
        """  
        执行动作(每步调用):输入扭矩指令,更新电机状态,返回奖励/终止状态  
        :param action: 扭矩指令(列表,长度1)  
        :return next_state: 下一个状态  
        :return reward: 单步奖励  
        :return terminated: 是否因SOC耗尽终止  
        :return truncated: 是否因步数超限终止  
        :return info: 空字典(适配gym接口)  
        """        # ===================== 1. 动作裁剪(防止极端扭矩) =====================        # 实际执行的扭矩:限制在[-80,80](比物理上限更保守,避免低效区)  
        torque_cmd = np.clip(action[0], -80, 80)  

        # ===================== 2. 状态更新(车速/转速) =====================        # 加速度:扭矩×0.04(下调系数,避免车速突变)  
        acceleration = torque_cmd * 0.04  
        # 车速更新:限制在[5,80](避免怠速,保证动力性)  
        self.speed = np.clip(self.speed + acceleration, 5, self.max_speed)  
        # 转速更新:车速×10(简化映射,实际电机转速与车速正相关)  
        self.motor_rpm = self.speed * 10  

        # ===================== 3. 能耗计算(核心:SOC下降) =====================        eff = self.efficiency(torque_cmd)                  # 当前扭矩下的效率  
        power_input = abs(torque_cmd) * self.motor_rpm / 9550  # 电机输入功率(kW,简化公式)  
        energy_consume = power_input * 0.1 / eff * self.base_energy_consume  # 0.1秒能耗  
        # SOC保护:低于0.2时能耗翻倍(强制智能体保护电池)  
        if self.soc < self.min_soc:  
            energy_consume *= 2.0  
        # 更新SOC:限制在[0,1](避免负数)  
        self.soc = np.clip(self.soc - energy_consume, 0, self.max_soc)  

        # ===================== 4. 奖励函数设计(核心:节能减排) =====================        reward = 0.0  
        # 核心奖励1:效率越高,奖励越高(正向激励)  
        reward += eff * 100  
        # 核心奖励2:能耗越高,惩罚越重(反向约束)  
        reward -= energy_consume * 400  
        # 约束奖励1:最优车速区间(20~60km/h),加正向奖励(保证动力性)  
        if 20 <= self.speed <= 60:  
            reward += 15  
        # 约束奖励2:SOC低于保护阈值,惩罚(避免电池过放)  
        if self.soc < self.min_soc:  
            reward -= 30  
        # 约束奖励3:高扭矩(>70N·m),惩罚(避免低效区)  
        if abs(torque_cmd) > 70:  
            reward -= 15  

        # ===================== 5. 终止条件 =====================        # terminated:SOC≤0.1(留缓冲,避免直接耗尽)  
        terminated = (self.soc <= 0.1)  
        # truncated:步数≥200(限制每回合最大步数)  
        truncated = (self.step_count >= 200)  
        done = terminated or truncated  # 总终止标志  
        self.step_count += 1            # 步数+1  

        # ===================== 6. 组装下一个状态 =====================        next_state = np.array([self.speed, self.soc, self.motor_rpm, self.motor_torque], dtype=np.float32)  
        return next_state, reward, terminated, truncated, {}  

# ===================== 6. 训练循环(在线策略) =====================# 作用:控制训练流程,收集经验,调用智能体更新,记录奖励/能耗  
def train_on_policy_agent(env, agent, num_episodes):  
    """  
    训练PPO智能体  
    :param env: 电机环境实例  
    :param agent: PPO智能体实例  
    :param num_episodes: 总训练回合数  
    :return return_list: 每回合总奖励  
    :return energy_consume_list: 每回合总能耗(SOC下降)  
    """    return_list = []          # 记录每回合的总奖励  
    energy_consume_list = []  # 记录每回合的总能耗  
    # 分10个迭代(方便进度条展示)  
    for i in range(10):  
        # tqdm进度条:可视化训练进度  
        with tqdm(total=int(num_episodes/10), desc=f'Iteration {i}') as pbar:  
            # 每个迭代训练num_episodes/10回合  
            for i_episode in range(int(num_episodes/10)):  
                episode_return = 0  # 单回合总奖励  
                total_energy = 0.0  # 单回合总能耗  
                # 经验字典:存储一回合的经验  
                transition_dict = {  
                    'states': [], 'actions': [],  
                    'next_states': [], 'rewards': [], 'dones': []  
                }  
                # 重置环境:获取初始状态  
                state, info = env.reset(seed=0)  
                done = False  # 回合终止标志  
                # 单回合循环(直到终止)  
                while not done:  
                    # 智能体采样动作  
                    action = agent.take_action(state)  
                    # 环境执行动作:获取下一个状态、奖励、终止标志  
                    next_state, reward, terminated, truncated, info = env.step(action)  
                    done = terminated or truncated  
                    # 计算单步能耗:SOC下降量(仅累加正数)  
                    step_energy = state[1] - next_state[1]  
                    total_energy += step_energy if step_energy > 0 else 0  
                    # 存储经验  
                    transition_dict['states'].append(state)  
                    transition_dict['actions'].append(action)  
                    transition_dict['next_states'].append(next_state)  
                    transition_dict['rewards'].append(reward)  
                    transition_dict['dones'].append(done)  
                    # 更新当前状态  
                    state = next_state  
                    # 累加单步奖励  
                    episode_return += reward  
                # 回合结束:记录奖励和能耗  
                return_list.append(episode_return)  
                energy_consume_list.append(total_energy)  
                # 智能体更新:使用本回合经验更新策略  
                agent.update(transition_dict)  
                # 每10回合更新进度条后缀(展示关键指标)  
                if (i_episode + 1) % 10 == 0:  
                    pbar.set_postfix({  
                        'episode': f'{num_episodes/10*i + i_episode+1}',  # 当前总回合数  
                        'avg_return': f'{np.mean(return_list[-10:]):.3f}',  # 最近10回合平均奖励  
                        'avg_energy': f'{np.mean(energy_consume_list[-10:]):.5f}',  # 最近10回合平均能耗  
                        'explore': f'{agent.explore_decay:.4f}'  # 当前探索系数  
                    })  
                # 进度条+1  
                pbar.update(1)  
    # 返回所有回合的奖励和能耗  
    return return_list, energy_consume_list  

# ===================== 7. 主程序(入口) =====================if __name__ == "__main__":  
    # ===================== 超参数设置(保守型,避免震荡) =====================    actor_lr = 8e-5           # 策略网络学习率(8e-5,比1e-4更保守)  
    critic_lr = 1.5e-3        # 价值网络学习率(1.5e-3,比2e-3更保守)  
    num_episodes = 1000       # 总训练回合数(1000,平衡收敛和时间)  
    hidden_dim = 128          # 隐藏层维度(经验值,适配4维状态)  
    gamma = 0.97              # 折扣因子(0.97,更看重未来奖励)  
    lmbda = 0.92              # GAE系数(0.92,平衡平滑度)  
    epochs = 12               # 每回合更新轮数(12,比15少,避免过拟合)  
    eps = 0.2                 # PPO裁剪系数(经验值0.2)  
    # 计算设备:优先GPU,无则CPU  
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  

    # ===================== 初始化环境和智能体 =====================    env = SimpleMotorControlEnv()  # 电机环境  
    state_dim = env.state_dim      # 状态维度(4)  
    action_dim = env.action_dim    # 动作维度(1)  

    # 固定随机种子(保证结果可复现,答辩时结果一致)  
    torch.manual_seed(0)  
    np.random.seed(0)  

    # 初始化PPO智能体  
    agent = PPOContinuous(  
        state_dim, hidden_dim, action_dim,  
        actor_lr, critic_lr, lmbda, epochs, eps, gamma, device  
    )  

    # ===================== 开始训练 =====================    return_list, energy_consume_list = train_on_policy_agent(env, agent, num_episodes)  

    # ===================== 结果可视化(平滑曲线) =====================    window_size = 20  # 滑动平均窗口(20,曲线更平滑,答辩展示更美观)  
    # 滑动平均:卷积实现,mode='valid'表示仅保留有效窗口  
    return_smooth = np.convolve(return_list, np.ones(window_size)/window_size, mode='valid')  
    energy_smooth = np.convolve(energy_consume_list, np.ones(window_size)/window_size, mode='valid')  
    # 适配滑动平均后的x轴(窗口20,x轴从19开始)  
    episodes_smooth = list(range(window_size - 1, len(return_list)))  

    # 创建画布(12×6英寸,适配答辩PPT)  
    plt.figure(figsize=(12, 6))  

    # 子图1:奖励曲线(平滑)  
    plt.subplot(1, 2, 1)  
    plt.plot(episodes_smooth, return_smooth, color='#1f77b4')  # 蓝色曲线  
    plt.xlabel('Episodes')                                     # x轴标签:回合数  
    plt.ylabel('Smoothed Returns')                             # y轴标签:平滑奖励  
    plt.title('PPO on Motor Control (Stable Reward)')          # 标题  
    plt.grid(True, alpha=0.3)                                  # 网格(透明度0.3,不遮挡曲线)  

    # 子图2:能耗曲线(平滑)  
    plt.subplot(1, 2, 2)  
    plt.plot(episodes_smooth, energy_smooth, color='#ff7f0e')  # 橙色曲线  
    plt.xlabel('Episodes')                                     # x轴标签:回合数  
    plt.ylabel('Smoothed Energy Consume (SOC Drop)')           # y轴标签:平滑能耗(SOC下降)  
    plt.title('PPO on Motor Control (Stable Energy)')          # 标题  
    plt.grid(True, alpha=0.3)                                  # 网格  

    # 调整子图间距(避免重叠)  
    plt.tight_layout()  
    # 保存高清图(300dpi,适配答辩PPT/论文)  
    plt.savefig('motor_ppo_result.png', dpi=300, bbox_inches='tight')  
    # 显示图像  
    plt.show()  

    # ===================== 打印最终策略效果(答辩亮点) =====================    print("\n===== 最终策略效果 =====")  
    final_state, _ = env.reset()  # 重置环境到初始状态  
    total_energy_final = 0.0      # 最终单回合能耗  
    # 运行200步(一回合),计算总能耗  
    for _ in range(200):  
        action = agent.take_action(final_state)  
        next_state, _, _, _, _ = env.step(action)  
        total_energy_final += final_state[1] - next_state[1]  
        final_state = next_state