强化学习 单臂摆(CartPole) ,DQN, Reinforce, DDPG, PPOPytorch

单臂摆是强化学习的一个经典模型,本文采用了4种不同的算法来解决这个问题,使用Pytorch实现。

DQN:

参考:

算法思想:

https://mofanpy.com/tutorials/machine-learning/torch/DQN/

算法实现

https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

个人理解:DQN算法将Q学习和神经网络算法结合,解决了状态空间连续的问题。由于Q学习是off-policy的,所以需要target网络,即需要一个滞后版本的神经网络,防止一些并非最优的动作被采样之后,该动作的reward增加,之后就一直选择该非最优动作,从而影响学习的效率。由于神经网络的输入和Target要求独立同分布,所以采用ReplayBuffer和随机采样来解决这个问题。DQN的神经网络目标是让Q值预测的更准,所以loss是target和eval的完全平方差。

代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from collections import namedtuple

GAMMA = 0.99
lr = 0.1
EPSION = 0.1
buffer_size = 10000  # replay池的大小
batch_size = 32
num_episode = 100000
target_update = 10  # 每过多少个episode将net的参数复制到target_net


# 定义神经网络
class Net(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Net, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear2 = nn.Linear(hidden_size, hidden_size)
        self.Linear3 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        x = F.relu(self.Linear2(x))
        x = self.Linear3(x)
        return x


# nametuple容器
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'done', 'next_state'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):  # 采样
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class DQN(object):
    def __init__(self, input_size, hidden_size, output_size):
        self.net = Net(input_size, hidden_size, output_size)
        self.target_net = Net(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=lr)

        self.target_net.load_state_dict(self.net.state_dict())
        self.buffer = ReplayMemory(buffer_size)
        self.loss_func = nn.MSELoss()
        self.steps_done = 0

    def put(self, s0, a0, r, t, s1):
        self.buffer.push(s0, a0, r, t, s1)

    def select_action(self, state):
        eps_threshold = random.random()
        action = self.net(torch.Tensor(state))
        if eps_threshold > EPSION:
            choice = torch.argmax(action).numpy()
        else:
            choice = np.random.randint(0, action.shape[
                0])  # 随机[0, action.shape[0]]之间的数
        return choice

    def update_parameters(self):
        if self.buffer.__len__() < batch_size:
            return
        samples = self.buffer.sample(batch_size)
        batch = Transition(*zip(*samples))
        # 将tuple转化为numpy
        tmp = np.vstack(batch.action)
        # 转化成Tensor
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.LongTensor(tmp.astype(int))
        reward_batch = torch.Tensor(batch.reward)
        done_batch = torch.Tensor(batch.done)
        next_state_batch = torch.Tensor(batch.next_state)

        q_next = torch.max(self.target_net(next_state_batch).detach(), dim=1,
                           keepdim=True)[0]
        q_eval = self.net(state_batch).gather(1, action_batch)
        q_tar = reward_batch.unsqueeze(1) + (1-done_batch) * GAMMA * q_next
        loss = self.loss_func(q_eval, q_tar)
        # print(loss)
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()


if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    # 状态空间:4维
    # 动作空间:1维,并且是离散的,只有0和1两个动作
    Agent = DQN(env.observation_space.shape[0], 256, env.action_space.n)
    average_reward = 0  # 目前所有的episode的reward的平均值
    for i_episode in range(num_episode):
        s0 = env.reset()
        tot_reward = 0  # 每个episode的总reward
        tot_time = 0  # 实际每轮运行的时间 (reward的定义可能不一样)
        while True:
            env.render()
            a0 = Agent.select_action(s0)
            s1, r, done, _ = env.step(a0)
            tot_time += r  # 计算当前episode的总时间
            # 网上定义的reward方法
            # x, x_dot, theta, theta_dot = s1
            # r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
            # r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
            # r = r1 + r2
            tot_reward += r  # 计算当前episode的总reward
            if done:
                t = 1
            else:
                t = 0
            Agent.put(s0, a0, r, t, s1)  # 放入replay池
            s0 = s1
            Agent.update_parameters()
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (
                        tot_reward - average_reward)
                print('Episode ', i_episode, 'tot_time: ', tot_time,
                      ' tot_reward: ', tot_reward, ' average_reward: ',
                      average_reward)
                break
        if i_episode % target_update == 0:
            Agent.target_net.load_state_dict(Agent.net.state_dict())

有一个点需要注意,网上有些DQN的实现没有考虑终止状态,所以需要修改Reward才能达到好的效果。在考虑终止状态后,使用原始的reward就可以学习。

Reinforce:

参考:

思路及代码:

https://blog.csdn.net/qq_37266917/article/details/109855244

个人理解:

Reinforce是一种策略梯度算法,对参数化的策略梯度算法进行梯度上升。需要注意网络不能太复杂,不然会过拟合导致很难学习。通过策略梯度定理,我们知道了怎么进行梯度上升。概率前面的回报可以看成梯度上升的幅度,即回报越大提升的概率也越多。所以在Policy Gradient中引入的基线(baseline),以防止某些非最优的动作被选择之后概率变得过大(虽然在样本足够多的时候这个问题也能解决)

神经网络的loss是 t时刻的回报 * t时刻的动作的概率取对数。以及要取负,因为神经网络是梯度下降,最小化loss,取负数就是最大化回报。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from torch.distributions import Categorical
from collections import deque
from collections import namedtuple

GAMMA = 1.0
lr = 0.1
EPSION = 0.9
buffer_size = 10000
batch_size = 32
num_episode = 100000
target_update = 10
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200

class Policy(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Policy, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, x):
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = F.softmax(self.Linear3(x), dim=1)
        return x
        # x = F.relu(self.fc1(x))
        # x = self.fc2(x)
        # return F.softmax(x, dim=1)

class Reinforce(object):
    def __init__(self, input_size, hidden_size, output_size):
        self.net = Policy(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=0.01)

    def select_action(self, s):
        s = torch.Tensor(s).unsqueeze(0)
        probs = self.net(s)
        tmp = Categorical(probs)
        a = tmp.sample()
        log_prob = tmp.log_prob(a)
        return a.item(), log_prob

    def update_parameters(self, rewards, log_probs):
        R = 0
        loss = 0
        # for i in reversed(range(len(rewards))):
        #     R = rewards[i] + GAMMA * R
        for i in reversed(range(len(rewards))):
            R = rewards[i] + GAMMA * R
            loss = loss - R * log_probs[i]
        # discounts = [GAMMA ** i for i in range(len(rewards) + 1)]
        # R = sum([a * b for a, b in zip(discounts, rewards)])
        # policy_loss = []
        # for log_prob in log_probs:
        #     policy_loss.append(-log_prob * R)
        # loss = torch.cat(policy_loss).sum()
        # print('loss: ', len(loss))
        # loss = loss / len(loss)
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    average_reward = 0
    Agent = Reinforce(env.observation_space.shape[0], 16, env.action_space.n)
    # scores_deque = deque(maxlen=100)
    # scores = []
    for i_episode in range(1, num_episode + 1):
        s = env.reset()
        log_probs = []
        rewards = []
        while True:
            env.render()
            a, prob = Agent.select_action(s)
            s1, r, done, _ = env.step(a)
            # scores_deque.append(sum(rewards))
            # scores.append(sum(rewards))
            log_probs.append(prob)
            rewards.append(r)
            s = s1
            if done:
                average_reward = average_reward + (1 / (i_episode + 1)) * (np.sum(rewards) - average_reward)
                if i_episode % 100 == 0:
                    # print('Episode {}\t Average Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
                    print('episode: ', i_episode, "tot_rewards: ", np.sum(rewards), 'average_rewards: ', average_reward)
                break
        Agent.update_parameters(rewards, log_probs)

DDPG:

参考:

思路:https://www.cnblogs.com/pinard/p/10345762.html

实现:https://zhuanlan.zhihu.com/p/99406809

个人理解:DDPG算法采用了Actor-Critic框架,像是DQN和Policy Gradient的结合。在DDPG中,Actor输出的是一个具体的动作,而不是动作的概率分布,Critic输出的是动作的Q值。Actor和Critic都需要一个Tareget网络,需要ReplayBuffer打破相关性。网上我没找到用DDPG和Pytorch解决单臂杆问题的代码,所以我的解决方法可能不是最好的。因为单臂杆的动作是离散的2个(0,1),最开始我给Actor设置了2个输出并用argmax决定是哪个。后面发现argmax没有梯度,于是我将输出改为了一个,并套了一层sigmoid,输出小于0.5当0算,大于0.5当1算。Critic的loss和DQN类似,都是target和eval的完全平方差。Actor的loss需要自己先输出a,再用critic得到估值,求平均值,再取负。

代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
import numpy as np
from collections import namedtuple
import math

GAMMA = 0.9
lr = 0.1
EPSION = 0.9
buffer_size = 10000
batch_size = 32
num_episode = 100000
target_update = 10
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
tau = 0.02

#定义神经网络
class Actor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Actor, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, x):
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = torch.sigmoid(self.Linear3(x))
        return x

class Critic(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Critic, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        self.Linear1.weight.data.normal_(0, 0.1)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        # self.Linear2.weight.data.normal_(0, 0.1)
        self.Linear3 = nn.Linear(hidden_size, output_size)
        self.Linear3.weight.data.normal_(0, 0.1)

    def forward(self, s, a):
        # print('s1: ', s)
        # print('a1: ', a)
        x = torch.cat([s, a], dim=1)
        # print('x: ', x)
        x = F.relu(self.Linear1(x))
        # x = F.relu(self.Linear2(x))
        x = self.Linear3(x)
        return x

#nametuple容器
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):#采样
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class DDPG(object):
    def __init__(self, input_size, action_shape, hidden_size, output_size):
        self.actor = Actor(input_size, hidden_size, action_shape)
        self.actor_target = Actor(input_size, hidden_size, action_shape)
        self.critic = Critic(input_size + action_shape, hidden_size, action_shape)
        self.critic_target = Critic(input_size + action_shape, hidden_size, action_shape)
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=0.01)
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=0.01)

        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.buffer = ReplayMemory(buffer_size)
        self.loss_func = nn.MSELoss()
        self.steps_done = 0

    def put(self, s0, a0, r, s1, done):
        self.buffer.push(s0, a0, r, s1, done)

    def select_action(self, state):
        state = torch.Tensor(state)
        a = self.actor(state)
        return a

    def update_parameters(self):
        if self.buffer.__len__() < batch_size:
            return
        samples = self.buffer.sample(batch_size)
        batch = Transition(*zip(*samples))
        # print(batch.action)
        #将tuple转化为numpy
        # tmp = np.vstack(batch.action)
        # print(tmp)
        #转化成Tensor
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.Tensor(batch.action).unsqueeze(0).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward)
        next_state_batch = torch.Tensor(batch.next_state)
        done_batch = torch.Tensor(batch.done)
        #critic更新
        next_action_batch = self.actor_target(next_state_batch).unsqueeze(0).detach().view(-1, 1)
        # print('batch: ', next_action_batch)

        r_eval = self.critic(state_batch, action_batch)
        # print('s: ', next_state_batch)
        # print('a: ', next_action_batch)
        r_target = reward_batch + GAMMA * self.critic_target(next_state_batch, next_action_batch).detach().view(1, -1) * done_batch
        r_eval = torch.squeeze(r_eval)
        r_target = torch.squeeze(r_target)
        loss = self.loss_func(r_eval, r_target)
        self.critic_optim.zero_grad()
        loss.backward()
        self.critic_optim.step()
        #actor更新
        a = self.actor(state_batch).unsqueeze(0).view(-1, 1)
        # print('a: ', a)
        loss = -torch.mean(self.critic(state_batch, a))
        self.actor_optim.zero_grad()
        loss.backward()
        # print('a: ', a)
        self.actor_optim.step()
        #soft update
        def soft_update(net_target, net):
            for target_param, param in zip(net_target.parameters(), net.parameters()):
                target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)

        soft_update(self.actor_target, self.actor)
        soft_update(self.critic_target, self.critic)



if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    Agent = DDPG(env.observation_space.shape[0], 1, 16, env.action_space.n)
    average_reward = 0
    for i_episode in range(num_episode):
        s0 = env.reset()
        tot_reward = 0
        tot_time = 0
        while True:
            env.render()
            a0 = Agent.select_action(s0)
            s1, r, done, _ = env.step(round(a0.detach().numpy()[0]))
            tot_time += r
            tot_reward += r
            Agent.put(s0, a0, r, s1, 1 - done) #结束状态很重要,不然会很难学习。
            s0 = s1
            Agent.update_parameters()
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (tot_time - average_reward)
                # if i_episode % 100 == 0:
                print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward)
                break
        # if i_episode % target_update == 0:
        #     Agent.target_net.load_state_dict(Agent.net.state_dict())

PPO:

参考:

PPO算法流程及思想:

https://blog.csdn.net/qq_30615903/article/details/86308045

https://www.jianshu.com/p/9f113adc0c50

PPO算法的实现:

https://blog.csdn.net/weixin_42165585/article/details/112362125

个人理解:

PPO算法也是Actor-Critic架构,但是与DDPG不同,PPO为on-policy算法,所以不需要设计target网络,也不需要ReplayBuffer, 并且Actor和Critic的网络参数可以共享以便加快学习。PPO引入了重要度采样,使得每个episode的数据可以被多训练几次(实际的情况中,采样可能非常耗时)从而节省时间,clip保证的更新的幅度不会太大。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import namedtuple
import random
import gym
import math

lr = 0.0005
Capacity = 10000
num_epidose = 10000
Gamma = 0.98
lmbda = 0.95
eps_clip = 0.1

class Net(nn.Module):
    def __init__(self, input_size,hidden_size, output_size):
        super(Net, self).__init__()
        self.Linear1 = nn.Linear(input_size, hidden_size)
        # self.Linear2 = nn.Linear(hidden_size, hidden_size)
        self.Linear_actor = nn.Linear(hidden_size, output_size)
        self.Linear_critic = nn.Linear(hidden_size, 1)

    def actor_forward(self, s, dim):
        s = F.relu(self.Linear1(s))
        prob = F.softmax(self.Linear_actor(s), dim=dim)
        # print(prob)
        return prob

    def critic_forward(self, s):
        s = F.relu(self.Linear1(s))
        # s = F.relu(self.Linear2(s))
        return self.Linear_critic(s)

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'rate', 'done'))


class ReplayBuffer(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):#采样
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

    def clean(self):
        self.position = 0
        self.memory = []

class PPO(object):
    def __init__(self, input_size, hidden_size, output_size):
        super(PPO, self).__init__()
        self.net = Net(input_size, hidden_size, output_size)
        self.optim = optim.Adam(self.net.parameters(), lr=lr)
        self.buffer = ReplayBuffer(capacity=Capacity)

    def act(self, s, dim):
        s = torch.Tensor(s)
        prob = self.net.actor_forward(s, dim)
        return prob

    def critic(self, s):
        return self.net.critic_forward(s)

    def put(self, s0, a0, r, s1, rate, done):
        self.buffer.push(s0, a0, r, s1, rate, done)

    def make_batch(self):
        batch = self.buffer.memory
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        state_batch = torch.Tensor(batch.state).view(-1, 1)
        action_batch = torch.LongTensor(batch.action).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward).view(-1, 1)
        next_state_batch = torch.Tensor(batch.next_state)
        rate_batch = torch.Tensor(batch.rate).view(-1, 1)
        done_batch = torch.LongTensor(batch.done).view(-1, 1)
        return state_batch, action_batch, reward_batch, next_state_batch, done_batch, rate_batch

    def update_parameters(self):
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        batch = self.buffer.memory
        samples = self.buffer.memory
        batch = Transition(*zip(*samples))
        state_batch = torch.Tensor(batch.state)
        action_batch = torch.LongTensor(batch.action).view(-1, 1)
        reward_batch = torch.Tensor(batch.reward).view(-1, 1)
        next_state_batch = torch.Tensor(batch.next_state)
        rate_batch = torch.Tensor(batch.rate).view(-1, 1)
        done_batch = torch.LongTensor(batch.done).view(-1, 1)
        for i in range(3):
            td_target = reward_batch + Gamma * self.critic(next_state_batch) * done_batch
            delta = td_target - self.critic(state_batch)
            delta = delta.detach().numpy()

            advantage_list = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = Gamma * advantage + delta_t
                advantage_list.append(advantage)

            advantage_list.reverse()
            advantage = torch.Tensor(advantage_list)
            prob = self.act(state_batch, 1).squeeze(0)
            prob_a = prob.gather(1, action_batch.view(-1, 1))
            ratio = torch.exp(torch.log(prob_a) - torch.log(rate_batch))
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip) * advantage
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.critic(state_batch), td_target.detach())
            self.optim.zero_grad()
            loss.mean().backward()
            self.optim.step()



if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    Agent = PPO(env.observation_space.shape[0], 256, env.action_space.n)
    average_reward = 0
    for i_episode in range(num_epidose):
        s0 = env.reset()
        tot_reward = 0
        while True:
            env.render()
            prob = Agent.act(torch.from_numpy(s0).float(), 0)
            a0 = int(prob.multinomial(1))
            s1, r, done, _ = env.step(a0)
            rate = prob[a0].item()
            Agent.put(s0, a0, r, s1, rate, 1 - done)
            s0 = s1
            tot_reward += r
            if done:
                average_reward = average_reward + 1 / (i_episode + 1) * (
                        tot_reward - average_reward)
                if i_episode % 20 == 0:
                    print('Episode ', i_episode,
                      ' tot_reward: ', tot_reward, ' average_reward: ',
                      average_reward)
                break
        # Agent.train_net()
        Agent.update_parameters()
        Agent.buffer.clean()