强化学习笔记(基础篇)

第零章:环境配置 & GitHub 项目上传

# 配置镜像源(管理员身份打开Anaconda Prompt)
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
conda config --set show_channel_urls yes

#配置Pip镜像源
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

# 创建虚拟环境(不需要与你本地版本一致,按照自己需求就行了)
cd /d G:\test
conda create --prefix .\.venv python=3.11
conda activate G:\test\.venv

#打开powershell查询一下自己的型号(后续安装pytorch版本需要,我的是12.4,因此我选的12.1就可以兼容)
conda activate G:\test\.venv
nvidia-smi 

# 安装依赖
conda install numpy pandas matplotlib pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia

# 上传到 GitHub
git init
git remote add origin https://github.com/HaoyunT/test.git
git checkout -b main
git add .
git commit -m "初始化项目"
git push -u origin main

Miniconda 与虚拟环境配置(推荐)

如果你希望环境更轻量、易管理,建议使用 Miniconda(而非完整版 Anaconda)。下面给出在 Windows(PowerShell / Anaconda Prompt)下的安装与常用命令示例。

1) 安装 Miniconda(Windows)

  1. 访问官网下载页面: https://docs.conda.io/en/latest/miniconda.html ,选择 Windows 版的安装器(x86_64)。
  2. 双击运行安装程序,按默认选项安装即可;或者使用静默安装(管理员权限):
    start /wait "" Miniconda3-latest-Windows-x86_64.exe /S /D=C:\Miniconda3
  3. 安装完成后,在 PowerShell 中运行:
    conda init powershell
    # 关闭并重新打开 PowerShell,使改动生效

2) 从 environment.yml 创建环境(示例)

假设你的 yml 文件位于 G:\maddpg_rl\environment.yml,在 PowerShell 中运行:

conda env create -f G:\maddpg_rl\environment.yml
conda activate maddpg_env

说明:如果 yml 文件内部指定了环境名称,conda env create -f 会按其中的名字创建环境;你也可以用 -n 指定名称,例如 conda env create -n maddpg_env -f ...

3) 安装或替换 PyTorch(示例)

根据你的 CUDA / 驱动版本选择合适的 cudatoolkit。示例命令(自动确认):

conda install -y -c pytorch -c nvidia pytorch=2.4.0 torchvision=0.19.0 torchaudio=2.4.0 cudatoolkit=12.4

安装完成后,用下面命令验证 GPU 与 PyTorch 是否可用:

python -c "import torch; print('torch', torch.__version__); print('cuda_available', torch.cuda.is_available()); print('cuda_device_count', torch.cuda.device_count())"

4) 常见问题与建议

  • conda env create 报错:先检查 yml 的语法或是否包含平台特定的包;可尝试使用 mamba 来加速并避免冲突:
    conda install -n base -c conda-forge mamba
    mamba env create -f G:\maddpg_rl\environment.yml
  • conda activate 无效:确认已执行 conda init powershell 并重新打开 PowerShell。
  • 更新或同步环境:
    conda env update -f G:\maddpg_rl\environment.yml --prune
  • 在 VS Code 中使用该环境:打开命令面板选择 Python: Select Interpreter,选择对应 conda 环境的解释器;或在工作区设置中配置 python.defaultInterpreterPath
  • 查看已安装的包:
    conda list
  • 删除环境:
    conda env remove -n maddpg_env

本笔记学习路径导图

  • 第1-3章:理论基础
    • 第1章:强化学习基本概念(智能体、环境、状态、动作、奖励、价值函数)
    • 第2章:贝尔曼方程(价值函数的递推关系)
    • 第3章:贝尔曼最优方程(最优策略的刻画)
  • 第4-5章:动态规划方法(需知道环境模型)
    • 第4章:策略迭代(评估 + 改进的循环)
    • 第5章:值迭代(直接优化价值函数)
  • 第6-7章:采样方法(无模型学习的基础)
    • 第6章:蒙特卡洛方法(回合后更新,高方差)
    • 第7章:时间差分学习(在线更新,低方差,自举思想)
  • 第8-10章:Q学习系列(离散动作空间)
    • 第8章:Q-learning(表格形式的时差学习)
    • 第9章:DQN(用神经网络逼近Q函数,适应高维状态)
    • 第10章:DDQN(双网络解决Q值过估计)
  • 第11-13章:策略梯度方法(直接优化策略,支持连续动作)
    • 第11章:策略梯度(策略参数空间中的梯度上升)
    • 第12章:Actor-Critic(结合价值函数降低方差)
    • 第13章:A3C & A2C(异步并行 → 同步改进)

关键洞察链路:

  1. 贝尔曼方程 → 动态规划方法(理论最优但需要模型)
  2. 蒙特卡洛方法 + 时间差分 → 无模型学习(采样+自举)
  3. Q-learning(表格)→ DQN(神经网络)→ DDQN(稳定训练)
  4. 策略梯度 = 绕过Q函数,直接优化策略参数
  5. Actor-Critic = 用V(s)辅助(Critic)来加速策略梯度(Actor)学习
  6. A3C/A2C = N步TD + 并行化提升采样和训练效率(A3C异步 → A2C同步)

第一章:强化学习基本概念

强化学习(Reinforcement Learning, RL)核心概念:

智能体(Agent)
决策执行主体。在每个时间步 \(t\) 观察状态 \(S_t\),选择动作 \(A_t\),并通过奖励信号调整策略。
示例:游戏中的玩家角色。
环境(Environment)
智能体交互对象,定义状态空间、动作空间和状态转移规则。动作后返回状态 \(S_{t+1}\) 和奖励 \(R_t\)。
示例:游戏关卡或物理模拟器。
状态(State)
描述环境在某一时刻的特征,满足马尔可夫性质。
示例:角色位置、敌人位置。
动作(Action)
智能体可执行的操作,改变环境状态。
示例:左右移动、跳跃。
奖励(Reward)
环境对动作的反馈,用于衡量优劣。强化学习目标最大化累计奖励:
\(G_t = \sum_{k=0}^{\infty} \gamma^k R_{t+k}\)
策略(Policy)
选择动作的规则,\(\pi(a|s)\) 或 \(a = \pi(s)\)。目标是找到最优策略 \(\pi^*\)。
价值函数(Value Function)
衡量状态或状态-动作对的长期回报:
  • 状态价值:\(V^\pi(s) = \mathbb{E}[G_t | S_t=s]\)
  • 动作价值:\(Q^\pi(s,a) = \mathbb{E}[G_t | S_t=s, A_t=a]\)

第二章:贝尔曼方程(Bellman Equation)

贝尔曼方程用于描述策略下状态或状态-动作的价值递归关系,是强化学习中策略评估的核心工具。它将价值函数分解为即时奖励未来价值的折扣期望两部分,奠定了从第3到第10章所有价值函数方法的理论基础。

状态价值函数(State Value Function)
对于策略 π 下的状态价值函数 V^π(s)

$$ V^\pi(s) = \sum_{a \in A} \pi(a|s) \sum_{s' \in S} P(s'|s,a) \big[ R(s,a) + \gamma V^\pi(s') \big] $$

说明:当前状态价值 = 即时奖励 + 折扣后的未来价值期望。
状态-动作价值函数(Action-Value Function)
对于状态-动作对的价值函数 Q^π(s,a)

$$ Q^\pi(s,a) = R(s,a) + \gamma \sum_{s' \in S} \sum_{a' \in A} \pi(a'|s') Q^\pi(s',a') $$

状态价值函数与状态-动作价值函数的关系
状态价值函数可以由状态-动作价值函数得到:

$$ V^\pi(s) = \sum_{a \in A} \pi(a|s) Q^\pi(s,a) $$

解释:在状态 s 下,价值函数 V^π(s) 是策略选择动作后的期望 Q^π(s,a)

用途:策略评估、策略迭代、价值迭代的理论基础。

第三章:贝尔曼最优方程(Bellman Optimality Equation)

在寻找最优策略 π* 时,状态和状态-动作的价值函数满足贝尔曼最优方程,体现最优性原则。这是第4-5章(动态规划方法)和第8-10章(深度Q学习)的理论基石,将"最优策略搜索"转化为"最优价值函数求解"问题。

最优状态价值函数

$$ V^*(s) = \max_{a \in A} \sum_{s' \in S} P(s'|s,a) \big[ R(s,a) + \gamma V^*(s') \big] $$

最优状态-动作价值函数

$$ Q^*(s,a) = R(s,a) + \gamma \sum_{s' \in S} \max_{a' \in A} Q^*(s',a') $$

关系总结:

  • 第二章贝尔曼方程:给定策略 π → 计算 V^π 或 Q^π
  • 第三章贝尔曼最优方程:求最优策略 π* → V* 或 Q*

第四章:策略迭代(Policy Iteration)

策略迭代通过交替进行策略评估和策略改进来收敛到最优策略。

  1. 初始化策略:选择初始策略 π_0
  2. 策略评估:计算状态价值函数:

    $$ v_{\pi_k} = r_{\pi_k} + \gamma P_{\pi_k} v_{\pi_k} $$

  3. 策略改进:更新策略:

    $$ \pi_{k+1} = \arg\max_\pi (r_\pi + \gamma P_\pi v_{\pi_k}) $$

  4. 重复评估和改进,直到策略收敛。

第五章:值迭代(Value Iteration)

值迭代直接迭代状态价值函数,通过贝尔曼最优方程收敛到最优值函数,然后导出最优策略。

  1. 初始化价值函数:选择初始值 v_0
  2. 迭代更新:使用贝尔曼最优方程:

    $$ v_{k+1} = \max_\pi (r_\pi + \gamma P_\pi v_k) $$

  3. 策略导出:收敛后选择最优动作:

    $$ \pi^*(s) = \arg\max_a \sum_{s'} P(s'|s,a) \big[R(s,a) + \gamma v^*(s') \big] $$

策略迭代与值迭代的对比

Policy Iteration Value Iteration Comments
(1)Policy \(\pi_0\) N/A
(2)Value \(v_{\pi_0} = r_{\pi_0} + \gamma P_{\pi_0} v_{\pi_0}\) \(v_0 := v_{\pi_0}\)
(3)Policy \(\pi_1 = \arg\max_\pi ( r_\pi + \gamma P_\pi v_{\pi_0} )\) \(\pi_1 = \arg\max_\pi ( r_\pi + \gamma P_\pi v_0 )\) The two policies are the same
(4)Value \(v_{\pi_1} = r_{\pi_1} + \gamma P_{\pi_1} v_{\pi_1}\) \(v_1 = r_{\pi_1} + \gamma P_{\pi_1} v_0\) \(v_{\pi_1} \ge v_1 \text{ since } v_{\pi_1} \ge v_{\pi_0}\)
5) Policy \(\pi_2 = \arg\max_\pi ( r_\pi + \gamma P_\pi v_{\pi_1} )\) \(\pi'_2 = \arg\max_\pi ( r_\pi + \gamma P_\pi v_1 )\)

重要过渡说明:从动态规划到采样方法

动态规划(第4-5章)的局限性:

  • ✓ 收敛性理论完备,最优性有保证
  • 需要知道环境模型(状态转移概率 P、奖励函数 R)
  • ✗ 计算复杂度高(迭代次数多,状态空间大时效率低)

采样方法(第6-7章)的优势:

  • 无模型(Model-Free):只需与环境交互采样,无需知道转移概率
  • ✓ 更切合实际:真实世界通常无法获得完整的环境模型
  • ✓ 通过蒙特卡洛估计和时间差分学习逐步优化策略

理论桥梁:尽管采样方法不需要显式的环境模型,但它们的理论基础仍源于贝尔曼方程和贝尔曼最优方程。通过采样数据隐式地学习这些关系,从而优化策略。

第六章:蒙特卡洛方法(Monte Carlo Methods)

蒙特卡洛方法(Monte Carlo,简称 MC)是强化学习中一种基于采样的策略评估与优化方法。 与动态规划不同,MC 不依赖环境的状态转移概率模型 P(s'|s,a),而是通过反复从策略 π 生成完整回合(Episode), 根据实际获得的回报估计状态或状态-动作价值。

一、基本思想

给定策略 \( \pi \),智能体与环境交互多次,得到若干完整回合:

$$ (S_0, A_0, R_1, S_1, A_1, R_2, \dots, S_T) $$

对于每个回合,定义从时间步 \(t\) 开始的累计回报(Return):

$$ G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \dots = \sum_{k=0}^{T-t-1} \gamma^k R_{t+k+1} $$

MC 方法通过多次采样,利用这些 \(G_t\) 的平均值来近似价值函数。

状态价值估计:
$$ V(s) = \mathbb{E}_\pi[G_t \mid S_t = s] \approx \frac{1}{N(s)} \sum_{i=1}^{N(s)} G^{(i)}(s) $$
动作价值估计:
$$ Q(s,a) = \mathbb{E}_\pi[G_t \mid S_t = s, A_t = a] \approx \frac{1}{N(s,a)} \sum_{i=1}^{N(s,a)} G^{(i)}(s,a) $$

二、首次访问与每次访问

  • First-Visit MC: 仅在一个回合中第一次访问状态(或状态-动作对)时进行更新。
  • Every-Visit MC: 对回合中每次访问状态(或状态-动作对)都进行更新。

二者最终都会收敛到真实的价值函数,但首次访问 MC 方差略小。

三、蒙特卡洛策略评估算法

基本思想:给定策略 π,评估其状态价值函数 V(s)

  1. 初始化:对所有状态 s,设 V(s) = 0,同时记录每个状态的回报列表 Returns(s)
  2. 采集数据:与环境交互多次,每次产生一个完整回合 (S₀, A₀, R₁, S₁, ..., S_T)
  3. 计算回报:对每个回合,从末尾向前计算累计回报 G_t = R_{t+1} + γR_{t+2} + ...
  4. 策略评估(首次访问):对每个状态,仅在该回合中第一次出现时:
    • 将该时刻的回报 G_t 加入 Returns(S_t)
    • 更新价值函数:V(S_t) = Returns(S_t) 的平均值
  5. 重复:重复采集和更新过程,V(s) 逐渐逼近真实期望回报

这个算法是无模型、基于采样的方法——只需要实际回合数据,不需要知道环境的状态转移模型。

四、蒙特卡洛控制(MC Control)

在策略评估的基础上,MC 还可以实现策略改进,形成一个与策略迭代类似的过程:

  1. 评估当前策略 \(\pi\):使用 MC 方法估计 \(Q^\pi(s,a)\)
  2. 改进策略:采用贪婪或 ε-贪婪方式更新策略

    $$ \pi'(s) = \arg\max_a Q(s,a) $$

  3. 重复以上过程,直到收敛

五、ε-贪婪(ε-Greedy)探索策略

为了避免过早陷入次优策略,MC 控制中常用 ε-贪婪策略 进行探索:

$$ \pi(a|s) = \begin{cases} 1 - \varepsilon + \frac{\varepsilon}{|A(s)|}, & a = \arg\max_{a'} Q(s,a') \\ \frac{\varepsilon}{|A(s)|}, & \text{否则} \end{cases} $$

其中 \(\varepsilon \in [0,1]\) 表示随机探索概率。

六、MC 控制示例代码


import numpy as np
from collections import defaultdict

class MCControl:
    def __init__(self, num_states, num_actions, gamma=0.99, epsilon=0.1, alpha=0.1):
        self.num_states = num_states
        self.num_actions = num_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.alpha = alpha
        self.Q = defaultdict(lambda: np.zeros(num_actions))
        self.returns = defaultdict(list)  # 存储每个(s,a)的回报列表
        
    def epsilon_greedy(self, state):
        """ε-贪婪策略选择动作"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.num_actions)
        else:
            return np.argmax(self.Q[state])
    
    def train_episode(self, env):
        """运行一个完整回合并更新Q值"""
        trajectory = []
        state = env.reset()
        done = False
        
        # 收集回合轨迹
        while not done:
            action = self.epsilon_greedy(state)
            next_state, reward, done, _ = env.step(action)
            trajectory.append((state, action, reward))
            state = next_state
        
        # 从后向前计算回报并更新Q值
        G = 0
        visited_pairs = set()
        for t in range(len(trajectory) - 1, -1, -1):
            state, action, reward = trajectory[t]
            G = reward + self.gamma * G
            
            # 只在状态-动作对第一次出现时更新(First-Visit MC)
            if (state, action) not in visited_pairs:
                visited_pairs.add((state, action))
                self.returns[(state, action)].append(G)
                self.Q[state][action] = np.mean(self.returns[(state, action)])
    
    def train(self, env, num_episodes=1000):
        """训练过程"""
        rewards = []
        for episode in range(num_episodes):
            self.train_episode(env)
            
            if (episode + 1) % 100 == 0:
                # 测试当前策略
                test_reward = 0
                for _ in range(10):
                    test_reward += self.test_episode(env)
                avg_reward = test_reward / 10
                print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}")
                rewards.append(avg_reward)
        return rewards
    
    def test_episode(self, env):
        """测试当前策略"""
        state = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = np.argmax(self.Q[state])
            state, reward, done, _ = env.step(action)
            episode_reward += reward
        return episode_reward
    
    def get_policy(self):
        """获取贪婪策略"""
        policy = {}
        for state in self.Q:
            policy[state] = np.argmax(self.Q[state])
        return policy
  

七、MC 方法的特点与局限

优点缺点
无需知道环境模型(P、R) 必须等待完整回合结束,不能用于持续任务
实现简单,概念直观 收敛速度较慢,方差大
适合离线模拟环境(如游戏) 对长期任务或高维状态空间不适用

八、小结

  • MC 是基于采样的策略评估与控制方法。
  • 依赖回合结束的实际回报,而非估计的下一步价值。
  • 与动态规划相比,不需知道模型,但收敛慢。
  • 是理解时序差分(TD)学习的重要过渡。

第七章:时间差分学习(TD Learning)

时间差分学习(Temporal Difference,简称 TD)是强化学习中结合了 蒙特卡洛方法动态规划的策略评估与控制方法。 与蒙特卡洛不同,TD 可以在回合未结束时就更新状态价值,因此属于在线更新方法。

一、TD(0)核心公式

单步TD更新状态价值函数公式:

$$ V(s_t) \leftarrow V(s_t) + \alpha \big[ r_{t+1} + \gamma V(s_{t+1}) - V(s_t) \big] $$

其中:

  • V(s_t):当前状态的价值估计
  • r_{t+1}:当前动作的即时奖励
  • γ:折扣因子
  • α:学习率
  • δ_t = r_{t+1} + γ V(s_{t+1}) - V(s_t):TD误差

二、TD(0)算法流程

TD 与 MC 最大的区别:不等待回合结束,在每一步就进行更新!

  1. 初始化:对所有状态 s,设 V(s) = 0
  2. 交互一步:在状态 s_t 执行动作 a_t,观察即时奖励 r_{t+1} 和下一状态 s_{t+1}
  3. 计算 TD 误差:δ_t = r_{t+1} + γ·V(s_{t+1}) - V(s_t)
  4. 更新价值函数:V(s_t) ← V(s_t) + α·δ_t(使用学习率 α)
  5. 继续交互:s_t ← s_{t+1},重复步骤 2-4,不需要等待回合结束

关键优势:

  • ✓ 可以进行 在线学习,回合中实时更新
  • ✓ 可以用于 持续任务(无终止状态的任务)
  • ✓ 收敛速度比 MC 更快
  • ✓ 在不确定环境中更稳定

三、TD学习与MC方法对比

方法更新时机是否依赖环境模型偏差/方差
蒙特卡洛(MC)回合结束无偏,方差大
TD(0)每一步有偏,方差小

MC使用完整回报 G_t 更新价值,而TD使用下一状态估计 V(s_{t+1}) 引导当前更新。

四、TD(λ)与资格迹

TD(λ)引入了 资格迹(eligibility trace),可以结合多步信息更新价值:

$$ V(s) \leftarrow V(s) + \alpha \delta_t e(s) $$

  • e(s):状态的资格迹,记录过去时间步的重要性
  • λ ∈ [0,1] 控制多步信息衰减:λ=0 → TD(0),λ=1 → 接近MC

五、TD控制与ε-贪婪策略

在策略评估基础上,TD也可进行策略改进,实现TD控制:

  1. 评估当前策略:使用TD估计 Q(s,a)
  2. 改进策略:采用ε-贪婪选择动作

    $$ \pi(a|s) = \begin{cases} 1 - \varepsilon + \frac{\varepsilon}{|A(s)|}, & a = \arg\max_{a'} Q(s,a') \\ \frac{\varepsilon}{|A(s)|}, & \text{否则} \end{cases} $$

  3. 重复以上过程直到收敛

六、TD 控制示例代码(SARSA)


import numpy as np
from collections import defaultdict

class SARSA:
    def __init__(self, num_states, num_actions, alpha=0.1, gamma=0.99, epsilon=0.1):
        self.num_states = num_states
        self.num_actions = num_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.Q = defaultdict(lambda: np.zeros(num_actions))
    
    def epsilon_greedy(self, state):
        """ε-贪婪策略"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.num_actions)
        else:
            return np.argmax(self.Q[state])
    
    def train_step(self, state, action, reward, next_state, next_action, done):
        """单步SARSA更新"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * self.Q[next_state][next_action]
        
        # TD误差
        td_error = target - self.Q[state][action]
        
        # Q值更新
        self.Q[state][action] += self.alpha * td_error
    
    def train_episode(self, env):
        """运行一个完整回合"""
        state = env.reset()
        action = self.epsilon_greedy(state)
        done = False
        episode_reward = 0
        
        while not done:
            next_state, reward, done, _ = env.step(action)
            next_action = self.epsilon_greedy(next_state)
            
            # 更新Q值
            self.train_step(state, action, reward, next_state, next_action, done)
            
            episode_reward += reward
            state = next_state
            action = next_action
        
        return episode_reward
    
    def train(self, env, num_episodes=1000):
        """训练过程"""
        rewards = []
        for episode in range(num_episodes):
            reward = self.train_episode(env)
            rewards.append(reward)
            
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(rewards[-100:])
                print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}")
        return rewards
  

七、Bootstrapping(自举/引导)解释

Bootstrapping 是TD方法的核心思想:

TD不是等到完整回合获得真实回报 G_t 才更新价值,而是使用对下一状态价值的估计 V(s_{t+1}) 来引导当前状态价值更新:

$$ V(s_t) \leftarrow V(s_t) + \alpha \big[ r_{t+1} + \gamma V(s_{t+1}) - V(s_t) \big] $$

这种方式称为“自举”,直观理解为:

  • 用当前已有的估计去改进自身
  • 实现在线、逐步更新,不依赖整回合完整信息
  • 优点:收敛快、可在线学习;缺点:初始估计不准可能引入偏差

八、MC vs TD vs DP 三者对比(方差-偏差权衡)

方法更新依据是否需要模型方差偏差收敛性
动态规划(DP)贝尔曼方程(理论)是(需要P、R)00已知最优,但计算复杂
蒙特卡洛(MC)完整回合回报否(无模型)0(无偏)必须等回合结束
时间差分(TD)TD误差(一步)否(无模型)有偏在线更新,快速收敛

关键洞察:TD 选择了最好的折衷——虽然引入了偏差(因为 V 的初始估计不完美),但极大地降低了方差,使得在线学习成为可能。这就是为什么 TD 成为了后续所有强化学习算法(Q-learning、Actor-Critic、PPO等)的核心。

第八章:Q-learning

Q-learning 是强化学习中最经典的 值迭代算法,用于学习最优动作价值函数 Q*(s,a),从而间接得到最优策略:

$$ \pi^*(s) = \arg\max_a Q^*(s,a) $$

特点:

  • 属于 off-policy 算法:更新 Q 值时使用最大化未来 Q 值,而行为策略可以带探索
  • 目标是找到能获得最大累积折扣奖励的动作

一、核心公式

Q-learning 核心更新公式:

$$ Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \big[ r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t) \big] $$

符号含义
st, at当前状态和动作
rt+1即时奖励
st+1下一状态
α学习率(0.1~0.5,一般设置)
γ折扣因子(0.9~0.99,一般设置)
max Q(st+1,a')未来状态最优动作价值

二、算法流程

Q-learning 是 Off-policy 方法,行为策略可以探索,但学习的是最优策略。

  1. 初始化:对所有状态-动作对 (s,a),设 Q(s,a) = 0
  2. 策略选择:使用 ε-greedy 策略从状态 s 选择动作 a
    • 以概率 ε 随机选择(探索)
    • 以概率 1-ε 选择 Q 值最大的动作(利用)
  3. 执行并观察:执行动作 a,得到奖励 r 和下一状态 s'
  4. 计算目标 Q 值:y = r + γ·max_{a'} Q(s',a')
    • 虽然我们用 ε-greedy 探索,但这里取 max 说明我们朝着最优策略学习
  5. 更新 Q 值:Q(s,a) ← Q(s,a) + α·(y - Q(s,a))
  6. 重复:s ← s',继续交互和更新

学习率 α、折扣因子 γ、探索率 ε 的作用:

  • α(学习率):0.1~0.5,控制对新信息的吸收程度
  • γ(折扣因子):0.9~0.99,决定对未来奖励的重视程度
  • ε(探索率):通常从 1.0 衰减到 0.01~0.1,逐步从探索转向利用

三、特点与注意事项

  • 适用场景:离散状态和动作空间小的任务
  • 探索策略:ε-greedy 或 Boltzmann,可随训练衰减 ε
  • 学习率 α:一般 0.1~0.5,太大不稳定,太小收敛慢
  • 折扣因子 γ:一般 0.9~0.99
  • 收敛性:每个状态动作对被充分访问,且 α 衰减时可收敛

四、Python 示例


import numpy as np
import gym

class QLearning:
    """Q-learning 算法实现"""
    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.99, epsilon=0.1):
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha  # 学习率
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # 探索率
        self.Q = np.zeros((n_states, n_actions))
    
    def select_action(self, state):
        """ε-greedy 策略选择动作"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)
        else:
            return np.argmax(self.Q[state])
    
    def update(self, state, action, reward, next_state, done):
        """Q 值更新"""
        target = reward + self.gamma * np.max(self.Q[next_state]) * (1 - done)
        td_error = target - self.Q[state, action]
        self.Q[state, action] += self.alpha * td_error
    
    def train(self, env, n_episodes=2000, max_steps=100):
        """训练智能体"""
        for episode in range(n_episodes):
            state = env.reset()
            if isinstance(state, tuple):
                state = state[0]  # 兼容新版 gym
            done = False
            
            for _ in range(max_steps):
                action = self.select_action(state)
                result = env.step(action)
                next_state = result[0]
                reward = result[1]
                done = result[2]
                
                self.update(state, action, reward, next_state, done)
                state = next_state
                if done:
                    break
        
        return self.Q


# 使用示例
env = gym.make('FrozenLake-v1', is_slippery=False)
n_states = env.observation_space.n
n_actions = env.action_space.n

agent = QLearning(n_states, n_actions, alpha=0.1, gamma=0.99, epsilon=0.1)
Q_table = agent.train(env, n_episodes=2000, max_steps=100)

print("训练完成!")
print("Q 表形状:", Q_table.shape)
print("Q 表:\n", Q_table)
  

五、参数说明

参数建议范围说明
α(学习率)0.1~0.5控制 Q 更新幅度
γ(折扣因子)0.9~0.99长期奖励重要性
ε(探索率)0.05~0.2ε-greedy 探索概率,可随训练衰减
n_episodes500~5000根据任务复杂度调整
max_steps100~1000每回合最大步数,避免无限循环

第九章:DQN(Deep Q-Network)

DQN(Deep Q-Network)是强化学习中用于高维状态空间的经典算法,核心思想是用深度神经网络逼近动作价值函数 Q(s,a;θ),从而间接得到最优策略。

一、背景与原理

  • 传统 Q-learning 在大或连续状态空间中无法使用 Q 表
  • DQN 使用神经网络将状态映射到动作价值
  • 适用于高维输入,如图像环境(Atari 游戏)
  • 核心挑战:训练稳定性,需要经验回放和目标网络

深度学习基础:神经网络层详解

在深度强化学习中,神经网络是智能体的“大脑”,不同层(Layer)承担不同的功能角色:

1. 输入层(Input Layer)

  • 作用:接收来自环境的状态输入。
  • 示例:CartPole 的 4 维向量 [位置, 速度, 杆角, 角速度];Atari 的图像帧 [84×84×4]。

2. 线性层(Linear / 全连接层)

  • PyTorchnn.Linear(in_features, out_features)
  • 公式:y = Wx + b
  • 作用:对输入特征进行线性变换,是 MLP、Q 网络、Actor-Critic 等模型的核心组成。

3. 激活层(Activation Layer)

为网络引入非线性,使模型能拟合复杂函数映射:

名称公式输出范围常见用途
ReLUmax(0, x)[0, ∞)常规隐藏层激活
Tanh(e^x - e^(-x))/(e^x + e^(-x))[-1, 1]连续动作输出(DDPG、SAC)
Softmaxe^(x_i)/Σe^(x_j)[0,1]离散动作策略 π(a|s)

4. 卷积层(Convolutional Layer)

  • PyTorchnn.Conv2d(in_ch, out_ch, kernel_size, stride)
  • 作用:提取图像的局部空间特征,是视觉输入任务(Atari 等)的基础。

nn.Conv2d(4, 32, 8, stride=4)
nn.ReLU()
nn.Conv2d(32, 64, 4, stride=2)
nn.ReLU()
nn.Conv2d(64, 64, 3, stride=1)
nn.ReLU()
nn.Flatten()
nn.Linear(3136, 512)

5. 输出层(Output Layer)

生成最终输出,形式因任务而异:

任务类型输出维度激活函数输出含义
离散动作策略 π(a|s)动作数Softmax动作概率分布
连续动作策略 μ(s)动作维度Tanh动作均值
价值函数 V(s)1状态价值
Q 函数 Q(s,a)动作数各动作Q值

6. 其他常见层

  • 循环层(RNN / LSTM / GRU):处理时间序列或部分可观测任务(POMDP)。
  • 归一化层(BatchNorm / LayerNorm):稳定训练、加快收敛。
  • Dropout 层:防止过拟合(强化学习中较少使用)。
  • Flatten 层:将卷积输出展平为向量,连接 CNN 与全连接层。

总结

向量任务以全连接层为主,图像任务由卷积+Flatten+线性层构成,序列任务引入循环结构。
隐藏层提取特征,卷积层捕获局部模式,循环层建模时间依赖。所有层协作,共同构成智能体的“思维体系”。

二、核心公式

DQN 最小化 TD 误差损失:

$$ L(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{D}} \Big[ \big( y - Q(s,a;\theta) \big)^2 \Big] $$

其中目标 Q 值为:

$$ y = r + \gamma \max_{a'} Q(s',a'; \theta^-) $$

  • Q(s,a;θ):当前 Q 网络预测
  • y:目标 Q 值,由目标网络 θ⁻ 提供
  • 𝒟:经验回放池
  • γ:折扣因子

三、训练流程

  1. 初始化 Q 网络 Q(s,a;θ) 和目标网络 Q(s,a;θ⁻)
  2. 与环境交互,按 ε-greedy 策略选择动作,存入经验回放池
  3. 从经验回放中采样小批量数据训练 Q 网络,最小化 TD loss
  4. 定期更新目标网络 θ⁻ ← θ
  5. 重复以上步骤,直到策略收敛

四、核心技术点

技术作用
经验回放 Replay Buffer打破数据相关性,提高样本利用率,稳定训练
目标网络 Target Network减缓训练震荡,保证目标稳定
ε-greedy 策略平衡探索和利用
归一化 / 图像预处理降低高维输入维度,提升训练效率

五、DQN 示例代码


import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))

class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.memory = deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.memory.append(Transition(state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        indices = np.random.choice(len(self.memory), batch_size, replace=False)
        batch = [self.memory[i] for i in indices]
        return zip(*batch)
    
    def __len__(self):
        return len(self.memory)

class DQN:
    def __init__(self, state_dim, action_dim, gamma=0.99, epsilon=1.0, alpha=1e-3):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.min_epsilon = 0.01
        self.epsilon_decay = 0.995
        
        # Q网络和目标网络
        self.Q = QNetwork(state_dim, action_dim)
        self.Q_target = QNetwork(state_dim, action_dim)
        self.Q_target.load_state_dict(self.Q.state_dict())
        
        # 优化器
        self.optimizer = optim.Adam(self.Q.parameters(), lr=alpha)
        self.replay_buffer = ReplayBuffer(capacity=10000)
        self.loss_fn = nn.MSELoss()
    
    def select_action(self, state):
        """ε-greedy 策略选择动作"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            with torch.no_grad():
                q_values = self.Q(state_tensor)
            return q_values.argmax(dim=1).item()
    
    def train_step(self, batch_size):
        """从经验回放中采样并训练"""
        if len(self.replay_buffer) < batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
        
        # 转换为张量
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(np.array(actions))
        rewards = torch.FloatTensor(np.array(rewards))
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(np.array(dones))
        
        # 计算当前Q值
        q_values = self.Q(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # 计算目标Q值
        with torch.no_grad():
            max_next_q = self.Q_target(next_states).max(dim=1)[0]
            target_q = rewards + self.gamma * max_next_q * (1 - dones)
        
        # 计算损失
        loss = self.loss_fn(q_values, target_q)
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def train(self, env, num_episodes=1000, batch_size=32, target_update_freq=500):
        """完整训练流程"""
        rewards = []
        step_count = 0
        
        for episode in range(num_episodes):
            state = env.reset()
            done = False
            episode_reward = 0
            
            while not done:
                # 选择动作
                action = self.select_action(state)
                next_state, reward, done, _ = env.step(action)
                
                # 存储经验
                self.replay_buffer.add(state, action, reward, next_state, done)
                
                # 训练
                self.train_step(batch_size)
                
                episode_reward += reward
                state = next_state
                step_count += 1
                
                # 定期更新目标网络
                if step_count % target_update_freq == 0:
                    self.Q_target.load_state_dict(self.Q.state_dict())
            
            rewards.append(episode_reward)
            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
            
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(rewards[-100:])
                print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")
        
        return rewards
  

六、参数说明

参数建议范围说明
γ(折扣因子)0.95~0.99长期奖励权重
学习率 lr1e-4~1e-3Adam 或 RMSProp 优化器
ε 初值1.0初期高探索
ε 最小值0.01~0.1保持一定探索
ε 衰减率0.995~0.999逐步降低 ε
经验回放大小1e4~1e6根据环境大小选择
批量大小 batch_size32~64每次训练采样量
目标网络更新频率500~10000 steps定期同步 Q 网络到目标网络

七、训练完成判断

  • 累计奖励稳定:连续若干回合平均奖励变化很小
  • 测试环境表现:使用纯利用策略测试平均成功率或完成率达到目标
  • 损失收敛:观察 TD loss 基本稳定
  • ε-greedy 探索率较低:策略主要依赖网络估计动作,测试表现稳定

# 每隔 N 回合测试策略
if episode % test_interval == 0:
    total_reward = 0
    for _ in range(test_episodes):
        state = env.reset()
        done = False
        while not done:
            action = np.argmax(Q.predict(state))
            state, reward, done, _ = env.step(action)
            total_reward += reward
    avg_reward = total_reward / test_episodes
    if avg_reward >= target_reward:
        print("训练完成,策略收敛")
        break
  

第十章:DDQN(Double Deep Q-Network)

DDQN 是 DQN 的改进版本,用于解决 DQN 在训练中存在的Q 值过估计(Overestimation)问题。核心思想是将动作选择与动作评价分开,提高训练稳定性。

一、背景与原理

  • DQN 使用 max(Q) 同时选择最优动作和估计动作价值,容易高估 Q 值。
  • DDQN 将动作选择和动作评价分开:主网络选择动作目标网络评价动作价值
  • 这样可以降低 max 操作对偶然高估的放大效应,提高训练稳定性。

二、核心公式

DDQN 目标 Q 值计算:

$$ y^{DDQN} = r + \gamma Q(s', \arg\max_{a'} Q(s', a'; \theta), \theta^-) $$

  • 动作选择使用主网络参数 θ
  • 动作评价使用目标网络参数 θ^-
  • 损失函数与 DQN 一致:

    $$ L(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{D}} \Big[ (y^{DDQN} - Q(s,a;\theta))^2 \Big] $$

三、训练流程

  1. 初始化主网络 Q(s,a;θ) 和目标网络 Q(s,a;θ⁻)
  2. 与环境交互,使用 ε-greedy 策略选择动作,存入经验回放池
  3. 从经验回放中采样小批量训练,使用 DDQN 目标更新主网络参数 θ
  4. 定期同步目标网络 θ⁻ ← θ
  5. 重复以上步骤,直到训练完成

四、DDQN 与 DQN 区别

特性DQNDDQN
目标计算y = r + γ max_a Q(s',a;θ⁻)y = r + γ Q(s', argmax_a Q(s',a;θ), θ⁻)
选择动作目标网络主网络
评价动作目标网络目标网络
过估计问题存在大幅减轻
训练稳定性一般更稳

五、DDQN 示例代码


import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

class DDQN:
    def __init__(self, state_dim, action_dim, gamma=0.99, epsilon=1.0, alpha=1e-3):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.min_epsilon = 0.01
        self.epsilon_decay = 0.995
        
        # Q网络和目标网络
        self.Q = self._build_network(state_dim, action_dim)
        self.Q_target = self._build_network(state_dim, action_dim)
        self.Q_target.load_state_dict(self.Q.state_dict())
        
        self.optimizer = optim.Adam(self.Q.parameters(), lr=alpha)
        self.replay_buffer = deque(maxlen=10000)
        self.loss_fn = nn.MSELoss()
    
    def _build_network(self, state_dim, action_dim):
        return nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
    
    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.Q(state_tensor)
        return q_values.argmax(dim=1).item()
    
    def train_step(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return
        
        indices = np.random.choice(len(self.replay_buffer), batch_size, replace=False)
        batch = [self.replay_buffer[i] for i in indices]
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(np.array(actions))
        rewards = torch.FloatTensor(np.array(rewards))
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(np.array(dones))
        
        # 当前Q值
        q_values = self.Q(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # DDQN核心:用Q网络选择最优动作,用Q_target网络评估价值
        with torch.no_grad():
            # 用Q网络选择最优动作
            next_actions = self.Q(next_states).argmax(dim=1)
            # 用Q_target网络评估这个动作的价值
            max_next_q = self.Q_target(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)
            target_q = rewards + self.gamma * max_next_q * (1 - dones)
        
        # 计算损失
        loss = self.loss_fn(q_values, target_q)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def train(self, env, num_episodes=1000, batch_size=32, target_update_freq=500):
        rewards = []
        step_count = 0
        
        for episode in range(num_episodes):
            state = env.reset()
            done = False
            episode_reward = 0
            
            while not done:
                action = self.select_action(state)
                next_state, reward, done, _ = env.step(action)
                
                self.replay_buffer.append((state, action, reward, next_state, done))
                self.train_step(batch_size)
                
                episode_reward += reward
                state = next_state
                step_count += 1
                
                if step_count % target_update_freq == 0:
                    self.Q_target.load_state_dict(self.Q.state_dict())
            
            rewards.append(episode_reward)
            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
            
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(rewards[-100:])
                print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}")
        
        return rewards
  

六、参数说明(一般设置)

参数建议范围说明
γ(折扣因子)0.95~0.99长期奖励权重
学习率 lr1e-4~1e-3Adam 或 RMSProp 优化器
ε 初值1.0初期高探索
ε 最小值0.01~0.1保持一定探索
ε 衰减率0.995~0.999逐步降低 ε
经验回放大小1e4~1e6根据环境大小选择
批量大小 batch_size32~64每次训练采样量
目标网络更新频率500~10000 steps定期同步 Q 网络到目标网络

七、训练完成判断

同上

八、Q 网络参数更新与过估计分析

  • Q 网络参数 θ 实时更新:用于选择动作,随着梯度下降不断训练
  • 目标网络参数 θ⁻ 隔一段时间更新:用于评价动作价值,保持稳定避免 max 操作把偶然高估放大
  • 训练逻辑:主网络实时学 → 选择动作;目标网络稳定 → 评价动作;定期同步 → θ⁻ ← θ
  • 这样做的直觉:主网络像“冲动的玩家”,目标网络像“稳重的裁判”,降低 Q 值过估计,提高训练稳定性

九、从价值函数方法向策略梯度转变

Q-learning 与 DDQN 的局限性:

  • ✓ 优势:适合离散动作空间,理论完备
  • ✗ 劣势:无法直接处理连续动作空间(如机器人控制)
  • ✗ 劣势:对高维状态空间容易过估计 Q 值
  • ✗ 劣势:采样效率低,需要大量经验回放

策略梯度方法(第11-13章)的优势:

  • ✓ 直接对策略 π(a|s;θ) 参数化,自然支持连续动作空间
  • ✓ 可以学习随机策略而不仅是确定性策略
  • ✓ 使用 Advantage 函数作为基线,方差更低
  • ✓ 样本效率高,能充分利用交互数据

从"估计Q值选动作"→"直接学策略"的哲学转变:

  • Q-learning:通过估计每个动作的价值,来间接优化策略
  • 策略梯度:绕过价值函数,直接在策略参数空间中梯度上升
  • Actor-Critic:结合两者:用 V(s) 作为基线(Critic),直接优化策略(Actor)

第十一章:策略梯度方法(Policy Gradient, PG)

策略梯度方法直接对策略 π(a|s;θ) 参数化优化,而不是先估计 Q 值再选动作。适用于动作空间连续或复杂的环境。

一、策略梯度目标

最大化期望累计奖励:

$$ J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)] $$

  • θ:策略参数
  • τ:从策略生成的轨迹(state-action序列)
  • R(τ):轨迹总奖励

二、策略梯度定理

策略梯度公式:

$$ \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta} [ \nabla_\theta \log \pi_\theta(a|s) \, A^\pi(s,a) ] $$

  • ∇θ log πθ(a|s):调整策略参数 θ 改变动作概率的方向
  • Aπ(s,a)优势函数(Advantage),衡量动作相对于平均水平的好坏程度

策略梯度推导详解

直觉:我们到底想优化什么?

在强化学习中,我们希望调整策略参数 θ,让期望累计奖励最大:

$$J(\theta) = \mathbb{E}_{\pi_\theta}[R]$$

这就像监督学习里最小化损失函数,只不过这里是最大化奖励期望。要让 J(θ) 变大,我们要知道它对参数 θ 的梯度方向——也就是策略梯度。

推导过程:为什么会出现 log π(a|s) 和优势函数 A(s,a)?

第 1 步:从"期望的导数"开始

想求 ∇θJ(θ),可以写成轨迹积分的形式:

$$\nabla_\theta J(\theta) = \nabla_\theta \int p_\theta(\tau) R(\tau) d\tau$$

其中 pθ(τ) 是策略生成整条轨迹的概率。

第 2 步:用 log trick(对数技巧)让梯度进到 log 里面

利用恒等式:

$$\nabla_\theta p_\theta(\tau) = p_\theta(\tau) \nabla_\theta \log p_\theta(\tau)$$

于是:

$$\nabla_\theta J(\theta) = \int p_\theta(\tau) \nabla_\theta \log p_\theta(\tau) R(\tau) d\tau = \mathbb{E}_{\tau \sim p_\theta}[\nabla_\theta \log p_\theta(\tau) R(\tau)]$$

第 3 步:拆轨迹概率,只剩下策略的部分

轨迹概率由环境和策略共同决定:

$$p_\theta(\tau) = p(s_0) \prod_t \pi_\theta(a_t|s_t) p(s_{t+1}|s_t,a_t)$$

环境的状态转移 p(s'|s,a) 不依赖 θ,所以只有策略部分需要导数:

$$\nabla_\theta \log p_\theta(\tau) = \sum_t \nabla_\theta \log \pi_\theta(a_t|s_t)$$

于是:

$$\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\sum_t \nabla_\theta \log \pi_\theta(a_t|s_t) R(\tau)\right]$$

为什么用 log 而不是直接对 π(a|s) 求导?

  • 数学稳定性:log 把乘法变加法,稳定梯度传播
  • 计算效率:log 后的梯度形式更适合神经网络反向传播
  • 采样一致性:我们只能采样到具体动作 a,log π 让我们用单个样本估计整个分布的梯度

这就是经典的 REINFORCE 算法的数学来源!

三、Advantage(优势函数)的定义

Advantage 是动作价值与状态价值的差,表示相对优势:

$$ A^\pi(s,a) = Q^\pi(s,a) - V^\pi(s) $$

  • Qπ(s,a):在状态 s 执行动作 a 的预期回报
  • Vπ(s):在状态 s 遵循策略的预期回报(所有动作的平均)
  • A > 0:该动作比平均动作更优
  • A < 0:该动作比平均动作更差

直观意义:不只看动作本身的好坏(Q 值),而是看它相对于该状态其他动作的相对优劣。这样可以减少训练方差,提高收敛稳定性。

四、核心直觉

  • 如果 Advantage 高 → 增加 π(a|s) 的概率
  • 如果 Advantage 低 → 减少 π(a|s) 的概率
  • 如果 Advantage 接近 0 → 几乎不调整该动作的概率

数学来源:更新公式 Δθ ∝ ∇θ log π(a|s) * A(s,a),Advantage 有正有负,可以同时推高优秀动作、压低劣质动作。

概率角度:log π 梯度告诉我们如何调节 θ,使选择动作概率增大/减小;乘上 A(s,a) 让调整和动作相对好坏挂钩。

五、REINFORCE 算法(Monte Carlo Policy Gradient)

REINFORCE 使用累计回报 G_t 作为 Q 值的无偏估计,但方差较大:


for episode in range(num_episodes):
    states, actions, rewards = run_episode(env, policy)
    G = 0
    returns = []
    # 计算回合累计回报(无偏估计)
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    
    # 使用回报 G 作为基线减法的参考,计算 Advantage
    baseline = np.mean(returns)  # 简单的基线估计
    for s, a, Gt in zip(states, actions, returns):
        advantage = Gt - baseline  # Advantage = Q(s,a) - V(s) 的近似
        theta += alpha * grad_log_pi(s, a) * advantage
  

六、策略梯度训练完成判断

  • 策略稳定性:连续若干回合策略选择动作概率变化不大
  • 平均累计奖励收敛:测试环境下平均奖励达到目标

七、Advantage 的关键意义

为什么使用 Advantage 而不是直接用 Q 值?

  • 降低方差:Advantage = Q(s,a) - V(s) 是相对值,减少了绝对值的波动。如果所有动作的 Q 值都很大(或很小),直接用 Q 会导致高方差,用 Advantage 则相对稳定。
  • 基线效应:Advantage 本质上是以 V(s) 作为基线,衡量"相对于平均动作"的好坏,而不是"绝对"好坏。
  • 后续算法的基础:Actor-Critic、A2C、A3C、PPO 等所有现代策略梯度方法,都以 Advantage 作为核心概念。

直观对比:

  • 用 Q(s,a):在状态 s,有两个动作 a1、a2,Q(a1)=100,Q(a2)=99,都很优秀,但更新时两者对梯度的影响力量差异小。
  • 用 A(s,a):假设 V(s)=98,则 A(a1)=+2,A(a2)=+1,Advantage 清晰地区分了"谁更优",这也降低了估计的方差。

第十二章:Actor-Critic (AC) 方法

Actor-Critic 方法是策略梯度家族的一种改进方法,通过结合价值函数(Critic)来降低策略梯度的方差,提高训练稳定性和收敛速度。

一、REINFORCE 与 Actor-Critic 对比

方法 Advantage 估计方式 策略更新依据 更新频率 方差
REINFORCE 回合累计回报 G_t(高方差,无偏) G_t * ∇ log π(a|s) 回合结束
Actor-Critic (AC) TD 误差 δ_t = r + γ V(s') - V(s)(低方差,有偏) δ_t * ∇ log π(a|s) 每一步(在线)

核心区别:两者都在计算 Advantage,但估计方式不同。REINFORCE 用完整回合的回报(高方差),AC 用 TD 误差(低方差,但引入偏差)。

二、核心思想

  • Actor:生成动作策略 π(a|s;θ_actor),直接输出动作概率
  • Critic:评估状态价值 V(s;θ_critic),提供 TD 误差 δ_t 来指导 Actor 更新

直观理解:Actor 是“决策者”,Critic 是“裁判”,裁判告诉决策者动作好坏,Actor 根据反馈调整策略。

三、Advantage 函数与 TD 误差的数学推导

一、Advantage 函数的理论定义

Advantage 函数定义为:

$$ A^\pi(s_t, a_t) = Q^\pi(s_t, a_t) - V^\pi(s_t) $$

它表示在状态 $s_t$ 下采取动作 $a_t$,相对于该状态平均水平 $V^\pi(s_t)$ 的"相对优势"。

如果找到 $Q^\pi(s_t, a_t)$ 的近似式,就能得到 $A^\pi(s_t, a_t)$ 的近似。

二、展开 Q 函数的定义

Q 函数定义为:

$$ Q^\pi(s_t, a_t) = \mathbb{E}_\pi[r_t + \gamma V^\pi(s_{t+1})] $$

这是因为状态价值函数满足 Bellman 方程:$V^\pi(s_t) = \mathbb{E}_{a_t \sim \pi}[Q^\pi(s_t, a_t)]$

三、代入 Advantage 定义

将 $Q^\pi(s_t, a_t)$ 的展开式代入 Advantage 定义:

$$ A^\pi(s_t, a_t) = \mathbb{E}_\pi[r_t + \gamma V^\pi(s_{t+1})] - V^\pi(s_t) $$

在实际训练中,我们用一次采样来近似期望(即使用当前采样的实际奖励 $r_t$ 代替期望),得到:

$$ A^\pi(s_t, a_t) \approx r_t + \gamma V^\pi(s_{t+1}) - V^\pi(s_t) $$

四、这正是 TD 误差的定义

TD 误差(Temporal Difference Error)定义为:

$$ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) $$

关键结论

$$ A^\pi(s_t, a_t) \approx \delta_t $$

即 TD 误差就是 Advantage 的一次采样估计。

五、为什么这样做合理?

① TD 误差是 Advantage 的无偏估计

只要 $V(s)$ 近似 $V^\pi(s)$,那么 TD 误差的期望就是 Advantage:

$$ \mathbb{E}[\delta_t] = \mathbb{E}[r_t + \gamma V^\pi(s_{t+1}) - V^\pi(s_t)] = Q^\pi(s_t, a_t) - V^\pi(s_t) = A^\pi(s_t, a_t) $$

因此 TD 误差是 Advantage 的无偏估计

② TD 误差具有低方差的优点

  • 直接用回报估计(REINFORCE):$G_t = \sum_k \gamma^k r_{t+k}$ 需要累计完整轨迹的奖励,方差很高
  • 用 TD 误差估计(AC):$\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 只依赖当前一步和下一状态,方差极低,训练更稳定
  • 结果:虽然 V 函数的学习会引入一些偏差,但总体上方差的下降远超偏差的引入,使得 AC 算法在实践中比 REINFORCE 高效得多

这就是为什么在 AC、A2C、PPO 等所有现代策略梯度方法中,都用 TD 误差或其多步版本来近似 Advantage。

四、梯度更新的数学原理:为什么 Actor 是"加"、Critic 是"减"?

4.1 梯度的几何含义

设有一个函数 $f(\theta)$,其中 $\theta$ 是模型参数。

梯度(Gradient)$\nabla_\theta f(\theta)$ 表示在当前位置,函数增长最快的方向。

  • 沿着梯度方向走一步,$f(\theta)$ 会变大得最快;
  • 沿着反梯度方向走一步,$f(\theta)$ 会变小得最快。

直觉上:

  • 梯度告诉我们"往哪边爬山最快";
  • 反梯度告诉我们"往哪边下山最快"。

4.2 梯度下降(最小化损失)

假设我们要最小化损失函数 $L(\theta)$。

我们希望每一步让 $L$ 变小,因此要往下走:

$$ \theta \gets \theta - \alpha \nabla_\theta L(\theta) $$

其中:

  • $\alpha > 0$ 是学习率;
  • 减号代表"反梯度方向"
  • 因为梯度是"上升方向",要最小化就要往反方向走。
    ↑  梯度方向:L增加最快
θ ←── 反梯度方向:L减小最快

这就是"最小化 ⇒ 减梯度"的原因。

4.3 梯度上升(最大化回报)

相反,如果目标是最大化某个函数 $J(\theta)$(比如在强化学习中最大化期望回报):

$$ \theta \gets \theta + \alpha \nabla_\theta J(\theta) $$

这里我们沿梯度方向走,因为梯度指向"上升最快"的方向。

θ ──→ 梯度方向:J增加最快

4.4 联系 Actor-Critic 的更新公式

① Critic 要最小化 TD 误差平方:

$$ L = \frac{1}{2} \delta_t^2 $$

所以:

$$ \theta_\text{Critic} \gets \theta_\text{Critic} - \alpha \nabla_\theta L = \theta_\text{Critic} + \alpha \delta_t \nabla_\theta V_\theta(s) $$

(因为 $-\nabla_\theta L = \delta_t \nabla_\theta V_\theta(s)$)

这实际上是反向传播梯度形式的数学表达式。

② Actor 要最大化期望回报:

$$ J = \mathbb{E}[\log \pi_\theta(a|s) A_t] $$

所以:

$$ \theta_\text{Actor} \gets \theta_\text{Actor} + \alpha \nabla_\theta J = \theta_\text{Actor} + \alpha \nabla_\theta \log \pi_\theta(a|s) \delta_t $$

其中 $\nabla_\theta \log \pi_\theta(a_t|s_t)$ 是从 Actor 网络反向传播得到的梯度;$\delta_t$ 是权重信号(告诉 Actor 这次动作是好还是坏)。

总结:为什么两个更新式一个是"加",一个是"减"?

  • Critic 是最小化误差 → 反梯度下降(实际写成加号形式是因为已经展开了负号);
  • Actor 是最大化回报 → 顺梯度上升。

五、动作采样机制:$a_t \sim \pi(a|s_t)$ 的具体实现

在 Actor-Critic 算法中,Actor 是一个策略网络,它的输出不是一个单一动作,而是一个动作分布(例如"在状态 $s_t$ 下各个动作的概率")。

然后:

  • AC 算法会从这个概率分布中采样(sample)一个动作,执行它与环境交互。

所以:

$$ a_t \sim \pi_\theta(a|s_t) $$

意思是"从由 Actor 网络定义的分布 π 中抽取一个动作 $a_t$"。

5.1 离散动作空间(例如 CartPole, Atari)

在这种情况下,Actor 输出一个离散分布

$$ \pi_\theta(a|s) = \text{softmax}(f_\theta(s)) $$

也就是说,网络输出每个动作的概率 $p_i$,然后从中采样。

示例(PyTorch):

import torch
import torch.nn.functional as F

# 假设 Actor 输出 logits
logits = actor_net(state)              # shape [n_actions]
probs = F.softmax(logits, dim=-1)      # 转成概率分布
dist = torch.distributions.Categorical(probs)
action = dist.sample()                 # 从分布中采样一个动作
  • Categorical 表示多项式分布;
  • dist.sample() 就是实现 $a_t \sim \pi_\theta(a|s_t)$;
  • dist.log_prob(action) 之后会用于策略梯度更新。

这就对应伪代码里的那一行:"选择动作 $a_t \sim \pi(a|s_t)$"

实际上就是从 softmax 概率中抽样动作。

5.2 连续动作空间(例如 Pendulum, Mujoco)

在连续动作空间下,我们无法用 softmax,因为动作是实数。

于是 Actor 输出一个高斯分布参数

$$ \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)^2) $$

然后从该分布采样:

$$ a_t = \mu_\theta(s_t) + \sigma_\theta(s_t) \odot \epsilon, \quad \epsilon \sim \mathcal{N}(0, I) $$

示例(PyTorch):

mu, log_std = actor_net(state)
std = log_std.exp()
dist = torch.distributions.Normal(mu, std)
action = dist.sample()                 # 连续采样
action = torch.tanh(action)            # 限制在[-1,1]范围
  • 有些实现还会用 dist.rsample()(重参数化采样)来保持梯度可传播。

六、核心公式总结

  • TD 误差(用于 Advantage 估计)

    $$ \delta_t = r_t + \gamma V_\phi(s_{t+1}) - V_\phi(s_t) $$

  • Actor 更新(梯度上升,最大化回报)

    $$ \theta_\text{Actor} \gets \theta_\text{Actor} + \alpha_\text{Actor} \nabla_\theta \log \pi_\theta(a|s) \, \delta_t $$

  • Critic 更新(梯度下降,最小化误差)

    $$ \phi_\text{Critic} \gets \phi_\text{Critic} + \alpha_\text{Critic} \delta_t \nabla_\phi V_\phi(s) $$

    (等价于 $\phi \gets \phi - \alpha \nabla_\phi \frac{1}{2}\delta_t^2$)

七、训练流程

  1. 初始化 Actor 和 Critic 网络
  2. 与环境交互,选择动作 $a_t \sim \pi(a|s_t)$(从概率分布中采样)
  3. 执行动作,获得 $r_{t+1}, s_{t+1}$
  4. Critic 计算 TD 误差 $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$
  5. Actor 使用 $\delta_t$ 进行梯度上升更新策略
  6. Critic 使用 $\delta_t$ 进行梯度下降更新价值函数
  7. 重复以上步骤直到训练完成

八、Actor-Critic 示例代码(离散动作空间)


import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

class ActorNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        logits = self.fc3(x)
        return torch.softmax(logits, dim=-1), logits  # 同时返回 logits,便于算熵

class CriticNetwork(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)  # [B,1]

class ActorCritic:
    def __init__(self, state_dim, action_dim, gamma=0.99, actor_lr=1e-3, critic_lr=1e-3,
                 entropy_coef=0.01, grad_clip=0.5, device=None):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.entropy_coef = entropy_coef
        self.grad_clip = grad_clip
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        
        self.actor = ActorNetwork(state_dim, action_dim).to(self.device)
        self.critic = CriticNetwork(state_dim).to(self.device)
        
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)
    
    def _to_tensor(self, s):
        return torch.as_tensor(s, dtype=torch.float32, device=self.device).unsqueeze(0)

    def select_action(self, state):
        """采样动作(保留梯度图),也可以只返回动作,把log_prob放到train_step里再前向一次。"""
        state_tensor = self._to_tensor(state)
        action_probs, logits = self.actor(state_tensor)  # 不要用 no_grad
        dist = Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)  # [1] -> 标量
        entropy = dist.entropy()          # [1]
        return int(action.item()), log_prob, entropy
    
    def train_step(self, state, action, reward, next_state, done):
        """单步训练(TD(0))"""
        state_tensor = self._to_tensor(state)
        next_state_tensor = self._to_tensor(next_state)

        # ---- Critic 前向(要有梯度)----
        value = self.critic(state_tensor).squeeze(-1)        # [1]
        with torch.no_grad():
            next_value = self.critic(next_state_tensor).squeeze(-1)  # [1]
            target_value = torch.as_tensor([reward], device=self.device, dtype=torch.float32)
            if not done:
                target_value = target_value + self.gamma * next_value
        
        advantage = (target_value - value)  # [1]

        # ---- Actor 前向(重新计算log_prob与熵,确保有梯度图)----
        action_probs, _ = self.actor(state_tensor)
        dist = Categorical(action_probs)
        log_prob = dist.log_prob(torch.as_tensor([action], device=self.device))
        entropy = dist.entropy()

        # ---- 损失 ----
        actor_loss = -(log_prob * advantage.detach()) - self.entropy_coef * entropy
        critic_loss = torch.mean((value - target_value.detach()) ** 2)

        # ---- 反传与优化(带梯度裁剪)----
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        if self.grad_clip is not None:
            nn.utils.clip_grad_norm_(self.actor.parameters(), self.grad_clip)
        self.actor_optimizer.step()

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        if self.grad_clip is not None:
            nn.utils.clip_grad_norm_(self.critic.parameters(), self.grad_clip)
        self.critic_optimizer.step()

        return actor_loss.item(), critic_loss.item(), float(entropy.item())
    
    def _reset_env(self, env):
        """兼容 Gym / Gymnasium 不同 reset 返回格式"""
        out = env.reset()
        if isinstance(out, tuple) and len(out) == 2:
            obs, _ = out
            return obs
        return out
    
    def _step_env(self, env, action):
        """兼容 Gym / Gymnasium 不同 step 返回格式"""
        out = env.step(action)
        if len(out) == 4:
            next_state, reward, done, info = out
        else:
            next_state, reward, terminated, truncated, info = out
            done = terminated or truncated
        return next_state, reward, done, info

    def train_episode(self, env):
        state = self._reset_env(env)
        done = False
        episode_reward = 0.0
        
        while not done:
            action, _, _ = self.select_action(state)  # 这里不必返回log_prob,train_step里会重新计算
            next_state, reward, done, _ = self._step_env(env, action)
            
            self.train_step(state, action, reward, next_state, done)
            episode_reward += reward
            state = next_state
        
        return episode_reward
    
    def train(self, env, num_episodes=1000, print_every=100):
        rewards = []
        for episode in range(1, num_episodes + 1):
            R = self.train_episode(env)
            rewards.append(R)
            if episode % print_every == 0:
                avg_R = np.mean(rewards[-print_every:])
                print(f"Episode {episode}, Avg Reward: {avg_R:.2f}")
        return rewards

  

六、训练完成判断

  • 平均累计奖励稳定并收敛
  • Actor 策略概率变化不大
  • Critic TD loss 收敛

七、直观理解总结

  • Actor 决策,Critic 评价
  • TD 误差指导 Actor 更新,比 REINFORCE 高效、低方差
  • 可在线逐步更新策略,不必等整回合结束

第十三章:A3C 与 A2C

A3C (2016) 开创了N步TD + 并行训练,A2C (2017) 将异步改为同步以更好利用GPU。

一、A3C (Asynchronous Advantage Actor-Critic)

核心:N步TD + 异步多线程

N步回报:$R_t = \sum_{i=0}^{T-1-t} \gamma^i r_{t+1+i} + \gamma^{T-t} V(s_T)$,$A_t = R_t - V(s_t)$

异步更新:每个线程独立采样T步,立即异步更新全局网络。

A3C 简化代码


import torch
import torch.nn as nn
import threading

class A3CNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.shared = nn.Sequential(nn.Linear(state_dim, 128), nn.ReLU())
        self.actor = nn.Linear(128, action_dim)
        self.critic = nn.Linear(128, 1)
    
    def forward(self, x):
        shared = self.shared(x)
        return F.softmax(self.actor(shared), dim=-1), self.critic(shared)

class A3CWorker(threading.Thread):
    def __init__(self, global_net, env, T=20):
        super().__init__()
        self.global_net = global_net
        self.local_net = A3CNetwork(state_dim, action_dim)
        self.env = env
        self.T = T
    
    def run(self):
        while True:
            # 收集T步经验
            states, actions, rewards = [], [], []
            state = self.env.reset()
            
            for _ in range(self.T):
                probs, _ = self.local_net(torch.FloatTensor(state))
                action = torch.multinomial(probs, 1).item()
                next_state, reward, done, _ = self.env.step(action)
                
                states.append(state)
                actions.append(action)
                rewards.append(reward)
                
                if done: break
                state = next_state
            
            # 计算N步回报,更新本地网络,异步更新全局网络
            # ... 省略具体计算 ...
  

二、A2C (Advantage Actor-Critic)

核心:N步TD + 同步多环境

同步更新:多个环境同步采样T步,累积梯度后批量更新,GPU友好。

A2C 简化代码


class A2C:
    def __init__(self, state_dim, action_dim, n_envs=8, T=5):
        self.net = A3CNetwork(state_dim, action_dim)  # 复用网络结构
        self.envs = [gym.make('CartPole-v1') for _ in range(n_envs)]
        self.T = T
        
    def train_step(self):
        batch_data = []
        
        # 所有环境同步采样T步
        for env in self.envs:
            states, actions, rewards = [], [], []
            state = env.reset()
            
            for _ in range(self.T):
                probs, _ = self.net(torch.FloatTensor(state))
                action = torch.multinomial(probs, 1).item()
                next_state, reward, done, _ = env.step(action)
                
                states.append(state)
                actions.append(action) 
                rewards.append(reward)
                
                if done: break
                state = next_state
            
            batch_data.extend(list(zip(states, actions, rewards)))
        
        # 计算N步回报和损失,同步更新网络
        # ... 省略具体计算 ...
  

三、核心对比

特性A3CA2C
并行方式异步多线程同步多环境
更新时机各线程独立更新批量累积更新
探索性更强较好
稳定性一般更高
GPU利用一般更好

全体算法总结与选择指南

算法 需要模型 动作空间 核心思想 优点 缺点 适用场景
策略迭代 离散 评估+改进循环 理论完备 需要模型,计算复杂 模型已知小规模问题
值迭代 离散 直接优化V函数 简洁高效 需要模型 模型已知小规模问题
蒙特卡洛 离散 采样+回合回报 无需模型,理论简单 方差高,需等回合结束 离散小环境、离线学习
时间差分 (TD) 离散 自举+一步更新 低方差,在线学习 引入偏差 离散中等规模问题
Q-learning 离散 Off-policy值迭代 收敛快,无模型 Q值表过大不可行 离散问题的标准方法
DQN 离散 神经网络逼近Q 支持高维状态 Q值过估计,训练不稳定 高维观测、视觉任务
DDQN 离散 双网络减缓高估 训练稳定 计算量增加 复杂环境、持续任务
REINFORCE 连续/离散 策略参数直接梯度上升 支持连续动作 方差大,收敛慢 连续控制问题
Actor-Critic 连续/离散 Actor+Critic基线 低方差快速收敛 需要维护两个网络 连续控制、实时应用
A2C 连续/离散 多环境同步采样 并行高效稳定 需多GPU支持 大规模并行训练
A3C 连续/离散 多线程异步更新 高探索性、快速收敛 实现复杂 资源受限、实时系统

算法选择流程图

  1. 是否知道环境的转移概率模型?
    • 是 → 使用动态规划(策略迭代/值迭代)
    • 否 → 继续第2步
  2. 动作空间是离散还是连续?
    • 离散 → 继续第3步
    • 连续 → 使用策略梯度方法(REINFORCE/AC/A2C/A3C)
  3. 状态空间维度?
    • 低(几百个状态以下) → 使用Q-learning表格形式
    • 高(图像等)→ 使用DQN/DDQN
  4. 是否需要样本效率和稳定性?
    • 否 → DQN可以
    • 是 → 使用DDQN或组合A2C