发表于更新于

字数总计:2.4k阅读时长:10分钟 新加坡

强化学习-3-动态规划

强化学习的笔记、理解、感悟及代码实现,仅按个人思维进行精华总结和记录,使用的教程:动手学强化学习

动态规划依赖于子问题分解和递归方程,基于动态规划的强化学习算法主要有策略迭代和价值迭代两种,分别使用贝尔曼期望方程和贝尔曼最优方程,需要事先知道环境的状态转移和奖励函数,适用于有限的状态和动作空间。

策略迭代

策略迭代分策略评估和策略提升两部分,策略评估计算出每个状态的价值函数,策略提升根据价值函数改进策略,不断迭代直到收敛。

策略评估

贝尔曼期望方程为状态价值函数提供了一个递归关系。假设在某个状态s下,策略π(a|s)的价值函数为v(s,π),则贝尔曼期望方程为:

其中,π(a|s)是策略π在状态s下采取动作a的概率,r(s,a)是状态s下采取动作a的奖励,γ是折扣因子,p(s’|s,a)是状态s下采取动作a转移到状态s’的概率。

随机初始化所有状态的价值函数V(s)后,根据上一轮价值函数计算下一轮价值函数V’(s),随着即时奖励R(s,a)和未来奖励的传播积累,价值函数会越来越准确并收敛即(收敛性证明略)。

策略提升

贪心地在每一个状态选择动作价值最大的动作,得到新的策略:

策略迭代算法

alt text

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import copy
class CliffWalkingEnv:
""" 悬崖漫步环境"""
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol
self.nrow = nrow

self.P = self.createP()

def createP(self):

P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]


change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.nrow):
for j in range(self.ncol):
for a in range(4):

if i == self.nrow - 1 and j > 0:
P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
True)]
continue

next_x = min(self.ncol - 1, max(0, j + change[a][0]))
next_y = min(self.nrow - 1, max(0, i + change[a][1]))
next_state = next_y * self.ncol + next_x
reward = -1
done = False

if next_y == self.nrow - 1 and next_x > 0:
done = True
if next_x != self.ncol - 1:
reward = -100
P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
return P

class PolicyIteration:
""" 策略迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow
self.pi = [[0.25, 0.25, 0.25, 0.25]
for i in range(self.env.ncol * self.env.nrow)]
self.theta = theta
self.gamma = gamma
def policy_evaluation(self):
""" 策略评估 """
cnt1 = 0
while True:
cnt1 += 1
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))
qsa_list.append(self.pi[s][a] * qsa)
new_v[s] = sum(qsa_list)
delta = max([abs(self.v[s] - new_v[s]) for s in range(self.env.ncol * self.env.nrow)])
self.v = new_v
if delta < self.theta:
break
print(f"策略评估{cnt1}次完成")
def policy_improvement(self):
""" 策略提升 """
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))
qsa_list.append(qsa)
max_qsa = max(qsa_list)
cntq=qsa_list.count(max_qsa)
self.pi[s]=[1/cntq if q==max_qsa else 0 for q in qsa_list]
return self.pi
def policy_iteration(self):
""" 策略迭代 """
cnt = 0
while True:
cnt += 1
old_pi = copy.deepcopy(self.pi)
self.policy_evaluation()
new_pi = self.policy_improvement()
print("策略提升!")
if old_pi == new_pi:
break
print(f"策略迭代{cnt}轮完成!")
return new_pi
def print_result(self):
""" 打印结果 """
print("状态价值:")
for i in range(self.env.nrow):
for j in range(self.env.ncol):
print("{:.3f}".format(self.v[i * self.env.ncol + j]).center(8), end="")
print('\n')
print("策略:")
actions = ['^', 'v', '<', '>']
for i in range(self.env.nrow):
for j in range(self.env.ncol):
if i == self.env.nrow - 1 and j == self.env.ncol - 1:
print("goal".center(5), end="")
elif i == self.env.nrow - 1 and self.env.ncol-1>j > 0:
print("x".center(5), end="")
else:
a=self.pi[i*self.env.ncol+j]
a_str=''.join( actions[i] if a[i]>0 else 'o' for i in range(len(a)) )
print(a_str.center(5), end="")
print('\n')

if __name__ == '__main__':
env = CliffWalkingEnv()
agent = PolicyIteration(env, theta=0.001, gamma=0.9)
agent.policy_iteration()
agent.print_result()

结果:
alt text

价值迭代

相比策略迭代,价值迭代直接迭代优化、更新状态价值函数V(s)直至收敛,再从最优状态价值函数V*(s)中返回一个确定的最优策略

根据贝尔曼最优方程,状态价值函数V*(s)的最优解为:

迭代更新方式:

价值迭代算法:
alt text

实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ValueIteration:
""" 策略迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow
self.pi = [[0.25, 0.25, 0.25, 0.25]
for i in range(self.env.ncol * self.env.nrow)]
self.theta = theta
self.gamma = gamma
def value_iteration(self):
""" 价值迭代 """
cnt1 = 0
while True:
cnt1 += 1
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, reward, done = res
qsa += p * (reward + self.gamma * self.v[next_state]*(1-done))
qsa_list.append(qsa)
new_v[s] = max(qsa_list)
delta = max([abs(self.v[s] - new_v[s]) for s in range(self.env.ncol * self.env.nrow)])
self.v = new_v
if delta < self.theta:
break
print(f"价值迭代{cnt1}次完成")
self.get_policy()
def get_policy(self):
""" 从最优状态价值函数中得到策略 """
for s in range(self.env.nrow * self.env.ncol):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa)
maxq = max(qsa_list)
cntq = qsa_list.count(maxq)

self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
def print_result(self):
""" 打印结果 """
print("状态价值:")
for i in range(self.env.nrow):
for j in range(self.env.ncol):
print("{:.3f}".format(self.v[i * self.env.ncol + j]).center(8), end="")
print('\n')
print("策略:")
actions = ['^', 'v', '<', '>']
for i in range(self.env.nrow):
for j in range(self.env.ncol):
if i == self.env.nrow - 1 and j == self.env.ncol - 1:
print("goal".center(5), end="")
elif i == self.env.nrow - 1 and self.env.ncol-1>j > 0:
print("x".center(5), end="")
else:
a=self.pi[i*self.env.ncol+j]
a_str=''.join( actions[i] if a[i]>0 else 'o' for i in range(len(a)) )
print(a_str.center(5), end="")
print('\n')

if __name__ == '__main__':
"""价值迭代算法"""
env = CliffWalkingEnv()
agent = ValueIteration(env, theta=0.001, gamma=0.9)
agent.value_iteration()
agent.print_result()

结果:

alt text

总结

策略迭代和价值迭代是两种动态规划的算法,都需要事先知道奖励函数和状态转移概率的环境模型,适用于有限的状态和动作空间。策略迭代通过迭代求解最优策略,价值迭代通过迭代求解最优状态价值函数。两者的区别在于,策略迭代依赖于策略评估和策略提升,价值迭代直接优化状态价值函数。

具体表现在编码上:

  • 策略迭代更新状态价值函数时取状态s所有动作价值的和(sum),策略提升时取状态s动作价值最大的动作均分概率(max),策略评估、策略提升两部分循环进行收敛;
  • 价值迭代更新状态价值函数时直接取状态s的最大的动作价值(价值),迭代收敛为最优状态价值函数V*(s),再还原出最优策略。
  • 策略迭代需要多轮迭代,价值迭代只需一轮。