摘要:相比于Q learning,DQN本質上是為了適應更為復雜的環境,并且經過不斷的改良迭代,到了Nature DQN(即Volodymyr Mnih發表的Nature論文)這里才算是基本完善,
本文分享自華為云社區《強化學習從基礎到進階-案例與實踐[4.1]:深度Q網路-DQN專案實戰CartPole-v0》,作者: 汀丶 ,
1、定義演算法
相比于Q learning,DQN本質上是為了適應更為復雜的環境,并且經過不斷的改良迭代,到了Nature DQN(即Volodymyr Mnih發表的Nature論文)這里才算是基本完善,DQN主要改動的點有三個:
- 使用深度神經網路替代原來的Q表:這個很容易理解原因
- 使用了經驗回放(Replay Buffer):這個好處有很多,一個是使用一堆歷史資料去訓練,比之前用一次就扔掉好多了,大大提高樣本效率,另外一個是面試常提到的,減少樣本之間的相關性,原則上獲取經驗跟學習階段是分開的,原來時序的訓練資料有可能是不穩定的,打亂之后再學習有助于提高訓練的穩定性,跟深度學習中劃分訓練測驗集時打亂樣本是一個道理,
- 使用了兩個網路:即策略網路和目標網路,每隔若干步才把每步更新的策略網路引數復制給目標網路,這樣做也是為了訓練的穩定,避免Q值的估計發散,想象一下,如果當前有個transition(這個Q learning中提過的,一定要記住!!!)樣本導致對Q值進行了較差的過估計,如果接下來從經驗回放中提取到的樣本正好連續幾個都這樣的,很有可能導致Q值的發散(它的青春小鳥一去不回來了),再打個比方,我們玩RPG或者闖關類游戲,有些人為了破紀錄經常Save和Load,只要我出了錯,我不滿意我就加載之前的存檔,假設不允許加載呢,就像DQN演算法一樣訓練程序中會退不了,這時候是不是搞兩個檔,一個檔每幀都存一下,另外一個檔打了不錯的結果再存,也就是若干個間隔再存一下,到最后用間隔若干步數再存的檔一般都比每幀都存的檔好些呢,當然你也可以再搞更多個檔,也就是DQN增加多個目標網路,但是對于DQN則沒有多大必要,多幾個網路效果不見得會好很多,
1.1 定義模型
import paddle import paddle.nn as nn import paddle.nn.functional as F !pip uninstall -y parl !pip install parl import parl from parl.algorithms import DQN class MLP(parl.Model): """ Linear network to solve Cartpole problem. Args: input_dim (int): Dimension of observation space. output_dim (int): Dimension of action space. """ def __init__(self, input_dim, output_dim): super(MLP, self).__init__() hidden_dim1 = 256 hidden_dim2 = 256 self.fc1 = nn.Linear(input_dim, hidden_dim1) self.fc2 = nn.Linear(hidden_dim1, hidden_dim2) self.fc3 = nn.Linear(hidden_dim2, output_dim) def forward(self, state): x = F.relu(self.fc1(state)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x
1.2 定義經驗回放
from collections import deque class ReplayBuffer: def __init__(self, capacity: int) -> None: self.capacity = capacity self.buffer = deque(maxlen=self.capacity) def push(self,transitions): '''_summary_ Args: trainsitions (tuple): _description_ ''' self.buffer.append(transitions) def sample(self, batch_size: int, sequential: bool = False): if batch_size > len(self.buffer): batch_size = len(self.buffer) if sequential: # sequential sampling rand = random.randint(0, len(self.buffer) - batch_size) batch = [self.buffer[i] for i in range(rand, rand + batch_size)] return zip(*batch) else: batch = random.sample(self.buffer, batch_size) return zip(*batch) def clear(self): self.buffer.clear() def __len__(self): return len(self.buffer)
1.3 定義智能體
from random import random import parl import paddle import math import numpy as np class DQNAgent(parl.Agent): """Agent of DQN. """ def __init__(self, algorithm, memory,cfg): super(DQNAgent, self).__init__(algorithm) self.n_actions = cfg['n_actions'] self.epsilon = cfg['epsilon_start'] self.sample_count = 0 self.epsilon_start = cfg['epsilon_start'] self.epsilon_end = cfg['epsilon_end'] self.epsilon_decay = cfg['epsilon_decay'] self.batch_size = cfg['batch_size'] self.global_step = 0 self.update_target_steps = 600 self.memory = memory # replay buffer def sample_action(self, state): self.sample_count += 1 # epsilon must decay(linear,exponential and etc.) for balancing exploration and exploitation self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \ math.exp(-1. * self.sample_count / self.epsilon_decay) if random.random() < self.epsilon: action = np.random.randint(self.n_actions) else: action = self.predict_action(state) return action def predict_action(self, state): state = paddle.to_tensor(state , dtype='float32') q_values = self.alg.predict(state) # self.alg 是自帶的演算法 action = q_values.argmax().numpy()[0] return action def update(self): """Update model with an episode data Args: obs(np.float32): shape of (batch_size, obs_dim) act(np.int32): shape of (batch_size) reward(np.float32): shape of (batch_size) next_obs(np.float32): shape of (batch_size, obs_dim) terminal(np.float32): shape of (batch_size) Returns: loss(float) """ if len(self.memory) < self.batch_size: # when transitions in memory donot meet a batch, not update return if self.global_step % self.update_target_steps == 0: self.alg.sync_target() self.global_step += 1 state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample( self.batch_size) action_batch = np.expand_dims(action_batch, axis=-1) reward_batch = np.expand_dims(reward_batch, axis=-1) done_batch = np.expand_dims(done_batch, axis=-1) state_batch = paddle.to_tensor(state_batch, dtype='float32') action_batch = paddle.to_tensor(action_batch, dtype='int32') reward_batch = paddle.to_tensor(reward_batch, dtype='float32') next_state_batch = paddle.to_tensor(next_state_batch, dtype='float32') done_batch = paddle.to_tensor(done_batch, dtype='float32') loss = self.alg.learn(state_batch, action_batch, reward_batch, next_state_batch, done_batch)
2、定義訓練
def train(cfg, env, agent): ''' 訓練 ''' print(f"開始訓練!") print(f"環境:{cfg['env_name']},演算法:{cfg['algo_name']},設備:{cfg['device']}") rewards = [] # record rewards for all episodes steps = [] for i_ep in range(cfg["train_eps"]): ep_reward = 0 # reward per episode ep_step = 0 state = env.reset() # reset and obtain initial state for _ in range(cfg['ep_max_steps']): ep_step += 1 action = agent.sample_action(state) # sample action next_state, reward, done, _ = env.step(action) # update env and return transitions agent.memory.push((state, action, reward,next_state, done)) # save transitions state = next_state # update next state for env agent.update() # update agent ep_reward += reward # if done: break steps.append(ep_step) rewards.append(ep_reward) if (i_ep + 1) % 10 == 0: print(f"回合:{i_ep+1}/{cfg['train_eps']},獎勵:{ep_reward:.2f},Epislon: {agent.epsilon:.3f}") print("完成訓練!") env.close() res_dic = {'episodes':range(len(rewards)),'rewards':rewards,'steps':steps} return res_dic def test(cfg, env, agent): print("開始測驗!") print(f"環境:{cfg['env_name']},演算法:{cfg['algo_name']},設備:{cfg['device']}") rewards = [] # record rewards for all episodes steps = [] for i_ep in range(cfg['test_eps']): ep_reward = 0 # reward per episode ep_step = 0 state = env.reset() # reset and obtain initial state for _ in range(cfg['ep_max_steps']): ep_step+=1 action = agent.predict_action(state) # predict action next_state, reward, done, _ = env.step(action) state = next_state ep_reward += reward if done: break steps.append(ep_step) rewards.append(ep_reward) print(f"回合:{i_ep+1}/{cfg['test_eps']},獎勵:{ep_reward:.2f}") print("完成測驗!") env.close() return {'episodes':range(len(rewards)),'rewards':rewards,'steps':steps}
3、定義環境
OpenAI Gym中其實集成了很多強化學習環境,足夠大家學習了,但是在做強化學習的應用中免不了要自己創建環境,比如在本專案中其實不太好找到Qlearning能學出來的環境,Qlearning實在是太弱了,需要足夠簡單的環境才行,因此本專案寫了一個環境,大家感興趣的話可以看一下,一般環境介面最關鍵的部分即使reset和step,
import gym import paddle import numpy as np import random import os from parl.algorithms import DQN def all_seed(env,seed = 1): ''' omnipotent seed for RL, attention the position of seed function, you'd better put it just following the env create function Args: env (_type_): seed (int, optional): _description_. Defaults to 1. ''' print(f"seed = {seed}") env.seed(seed) # env config np.random.seed(seed) random.seed(seed) paddle.seed(seed) def env_agent_config(cfg): ''' create env and agent ''' env = gym.make(cfg['env_name']) if cfg['seed'] !=0: # set random seed all_seed(env,seed=cfg["seed"]) n_states = env.observation_space.shape[0] # print(hasattr(env.observation_space, 'n')) n_actions = env.action_space.n # action dimension print(f"n_states: {n_states}, n_actions: {n_actions}") cfg.update({"n_states":n_states,"n_actions":n_actions}) # update to cfg paramters model = MLP(n_states,n_actions) algo = DQN(model, gamma=cfg['gamma'], lr=cfg['lr']) memory = ReplayBuffer(cfg["memory_capacity"]) # replay buffer agent = DQNAgent(algo,memory,cfg) # create agent return env, agent
4、設定引數
到這里所有qlearning模塊就算完成了,下面需要設定一些引數,方便大家“煉丹”,其中默認的是筆者已經調好的~,另外為了定義了一個畫圖函式,用來描述獎勵的變化,
import argparse import seaborn as sns import matplotlib.pyplot as plt def get_args(): """ """ parser = argparse.ArgumentParser(description="hyperparameters") parser.add_argument('--algo_name',default='DQN',type=str,help="name of algorithm") parser.add_argument('--env_name',default='CartPole-v0',type=str,help="name of environment") parser.add_argument('--train_eps',default=200,type=int,help="episodes of training") # 訓練的回合數 parser.add_argument('--test_eps',default=20,type=int,help="episodes of testing") # 測驗的回合數 parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps") parser.add_argument('--gamma',default=0.99,type=float,help="discounted factor") # 折扣因子 parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon") # e-greedy策略中初始epsilon parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon") # e-greedy策略中的終止epsilon parser.add_argument('--epsilon_decay',default=200,type=int,help="decay rate of epsilon") # e-greedy策略中epsilon的衰減率 parser.add_argument('--memory_capacity',default=200000,type=int) # replay memory的容量 parser.add_argument('--memory_warmup_size',default=200,type=int) # replay memory的預熱容量 parser.add_argument('--batch_size',default=64,type=int,help="batch size of training") # 訓練時每次使用的樣本數 parser.add_argument('--targe_update_fre',default=200,type=int,help="frequency of target network update") # target network更新頻率 parser.add_argument('--seed',default=10,type=int,help="seed") parser.add_argument('--lr',default=0.0001,type=float,help="learning rate") parser.add_argument('--device',default='cpu',type=str,help="cpu or gpu") args = parser.parse_args([]) args = {**vars(args)} # type(dict) return args def smooth(data, weight=0.9): '''用于平滑曲線,類似于Tensorboard中的smooth Args: data (List):輸入資料 weight (Float): 平滑權重,處于0-1之間,數值越高說明越平滑,一般取0.9 Returns: smoothed (List): 平滑后的資料 ''' last = data[0] # First value in the plot (first timestep) smoothed = list() for point in data: smoothed_val = last * weight + (1 - weight) * point # 計算平滑值 smoothed.append(smoothed_val) last = smoothed_val return smoothed def plot_rewards(rewards,cfg,path=None,tag='train'): sns.set() plt.figure() # 創建一個圖形實體,方便同時多畫幾個圖 plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algo_name']} for {cfg['env_name']}") plt.xlabel('epsiodes') plt.plot(rewards, label='rewards') plt.plot(smooth(rewards), label='smoothed') plt.legend()
5、訓練
# 獲取引數 cfg = get_args() # 訓練 env, agent = env_agent_config(cfg) res_dic = train(cfg, env, agent) plot_rewards(res_dic['rewards'], cfg, tag="train") # 測驗 res_dic = test(cfg, env, agent) plot_rewards(res_dic['rewards'], cfg, tag="test") # 畫出結果
seed = 10 n_states: 4, n_actions: 2 開始訓練! 環境:CartPole-v0,演算法:DQN,設備:cpu 回合:10/200,獎勵:10.00,Epislon: 0.062 回合:20/200,獎勵:85.00,Epislon: 0.014 回合:30/200,獎勵:41.00,Epislon: 0.011 回合:40/200,獎勵:31.00,Epislon: 0.010 回合:50/200,獎勵:22.00,Epislon: 0.010 回合:60/200,獎勵:10.00,Epislon: 0.010 回合:70/200,獎勵:10.00,Epislon: 0.010 回合:80/200,獎勵:22.00,Epislon: 0.010 回合:90/200,獎勵:30.00,Epislon: 0.010 回合:100/200,獎勵:20.00,Epislon: 0.010 回合:110/200,獎勵:15.00,Epislon: 0.010 回合:120/200,獎勵:45.00,Epislon: 0.010 回合:130/200,獎勵:73.00,Epislon: 0.010 回合:140/200,獎勵:180.00,Epislon: 0.010 回合:150/200,獎勵:167.00,Epislon: 0.010 回合:160/200,獎勵:200.00,Epislon: 0.010 回合:170/200,獎勵:165.00,Epislon: 0.010 回合:180/200,獎勵:200.00,Epislon: 0.010 回合:190/200,獎勵:200.00,Epislon: 0.010

點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/555933.html
標籤:其他
上一篇:KubeSphere 社區雙周報 | OpenFunction 發布 v1.1.1 | 2023.6.9-6.22
下一篇:返回列表
