news 2026/5/27 21:20:56

强化学习QAC求最优策略的代码实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
强化学习QAC求最优策略的代码实现

理论基础:

注意:

1. get_policy_by_state_value_net() 是额外写的一个基于Q的贪心策略,不属于QAC算法,得到的策略不一定是最优的,与get_policy_by_policy_net表现一致是偶然现象。

2. 图片中的伪代码并没有说要生成多条episode,但这个为了保证每个(s,a)pair都能被访问到,会生成多条episode。

代码可运行:

import numpy as np import torch from torch import nn from env import GridWorldEnv from utils import drow_policy class QAC(object): def __init__(self, env: GridWorldEnv, gamma=0.9, lr_actor=1e-2, lr_critic=1e-2): self.env = env self.action_space_size = self.env.num_actions self.state_space_size = self.env.num_states self.gamma = gamma self.pnet = nn.Sequential( # policy_net nn.Linear(2, 16), # s -> Π(a|s) nn.ReLU(), nn.Linear(16, self.action_space_size) ) self.qnet = nn.Sequential( # q_value_net nn.Linear(2, 16), # s -> q[s,a] nn.ReLU(), nn.Linear(16, self.action_space_size) ) self.value_optimizer = torch.optim.Adam(self.qnet.parameters(), lr=lr_critic) self.policy_optimizer = torch.optim.Adam(self.pnet.parameters(), lr=lr_actor) self.policy = np.zeros((self.state_space_size, self.action_space_size)) self.q_value = np.zeros((self.state_space_size, self.action_space_size)) def decode_state(self, state): ''' :param state: int :return: 归一化后的元组 ''' i = state // self.env.size j = state % self.env.size return torch.tensor((i / (self.env.size - 1), j / (self.env.size - 1)), dtype=torch.float32) def generate_action(self, state): ''' :param state: tuple :return: int,float ''' logits = self.pnet(state) action_probs = torch.softmax(logits, dim=0) # π(a|s,θ) action_dist = torch.distributions.Categorical(action_probs) # 按分布采样 action = action_dist.sample() log_prob = action_dist.log_prob(action) # In π(a|s,θ) 注意传入的是索引,会自动做log(action_probs[action_index]) return action.item(), log_prob def solve(self, num_episodes=200): for _ in range(num_episodes): state_int = self.env.reset() state = self.decode_state(state_int) done = False while not done: action, log_prob = self.generate_action(state) # a_t,s_t,In π(a_t|s_t,θ) next_state_int, reward, done = self.env.step(state_int, action) # s_t+1,r_t+1 next_state = self.decode_state(next_state_int) if not done: next_action, _ = self.generate_action(next_state) # a_t+1 else: next_action, action_prob = None, None # Critic (value update) qvalue = self.qnet(state)[action] # q(s_t,a_t) if done: td_target = torch.tensor(reward, dtype=torch.float32) else: with torch.no_grad(): # semi gradient qvalue_next = self.qnet(next_state)[next_action] # q(s_t+1,a_t+1) td_target = torch.tensor(reward, dtype=torch.float32) + self.gamma * qvalue_next delta = td_target - qvalue # TD error self.value_optimizer.zero_grad() critic_loss = 0.5 * delta.pow(2) critic_loss.backward() self.value_optimizer.step() # Actor (policy update) qvalue = qvalue.detach() # 避免梯度污染 self.policy_optimizer.zero_grad() actor_loss = -log_prob * qvalue actor_loss.backward() self.policy_optimizer.step() state_int = next_state_int state = next_state def get_policy_by_policy_net(self): for s in range(self.state_space_size): if s in self.env.terminal: self.policy[s,4]=1 break s_t = self.decode_state(s) logits = self.pnet(s_t) action_probs = torch.softmax(logits, dim=0) a=torch.argmax(action_probs) self.policy[s,a]=1 return self.policy def get_policy_by_state_value_net(self): for s in range(self.state_space_size): if s in self.env.terminal: self.policy[s,4]=1 break a = np.argmax(self.q_value[s]) self.policy[s, a] = 1 return self.policy def get_qvalues(self): for s in range(self.state_space_size): s_t = self.decode_state(s) logits = self.qnet(s_t).detach().numpy() # q(s,a)表示在状态s执行动作a后,未来所有折扣回报的期望值,不要取softmax然后取最大 self.q_value[s, :] = logits return self.q_value if __name__ == '__main__': env = GridWorldEnv( size=5, forbidden=[(1, 2), (3, 3)], terminal=[(4, 4)], r_boundary=-1, r_other=-0.04, r_terminal=1, r_forbidden=-1, r_stay=-0.1 ) # 注意samples要大一点,否则每个state被访问到的概率很小 vi = QAC(env=env) vi.solve(num_episodes=200) print("\n state value: ") print(vi.get_qvalues()) print("\n get policy by policy net:") drow_policy(vi.get_policy_by_policy_net(), env) print("\n get policy by state value net:") drow_policy(vi.get_policy_by_state_value_net(), env)

运行结果:

1. 表现一致(终点状态不是 . 是因为没有特殊处理,其他代码保持不变。由于表现一致的情况很少,因此不再继续展示特殊处理后的输出)

2. 表现不一致

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/27 6:52:23

Java 的现实主义哲学:一门为“失败必然发生”而设计的工程语言

在很多技术宣传中,系统总是被描述得近乎完美:高可用、高性能、可无限扩展。 但真正做过工程的人都知道: 失败不是例外,而是常态。系统会超载、依赖会失效、数据会异常、人为失误一定会发生。 区别只在于——系统是否为失败做好了准…

作者头像 李华
网站建设 2026/5/19 13:57:28

串练习--------首字母大写HDOJ2026

题目&#xff1a;HDOJ 2026 代码 /* HDOJ 2026 https://acm.hdu.edu.cn/showproblem.php?pid2026 首字母大写 */ #include<iostream> #include<cstring> using namespace std; char a[105]; int main() {while (fgets(a, sizeof(a), stdin)) {//getchar();这里不…

作者头像 李华
网站建设 2026/5/22 12:53:08

等保测评全流程实操手册:从自查到验收,一步不踩坑

2025年3月20日起&#xff0c;新版《网络安全等级测评报告模板》正式启用&#xff0c;标志着等保测评进入“精准防控”新阶段。对企业而言&#xff0c;合规不再是简单满足条款&#xff0c;而是要应对云原生、物联网等新增场景的防护要求&#xff0c;这让不少运维人员在测评中频频…

作者头像 李华
网站建设 2026/5/26 21:07:08

第十七篇:Day49-51 前端工程化进阶——从“手动”到“自动化”(对标职场“提效降本”需求)

一、前置认知&#xff1a;前端工程化的核心价值与职场痛点 在掌握性能优化和安全防护能力后&#xff0c;我们能打造“快、稳、安”的产品&#xff0c;但当面对“团队10人协作开发”“每日3次版本迭代”“多环境部署”等职场场景时&#xff0c;手动复制文件、人工测试、线下传输…

作者头像 李华