前言
Flappy Bird簡介
《Flappy Bird》是一款由來自越南的獨立游戲開發者Dong Nguyen所開發的作品,游戲于2013年5月24日上線,并在2014年2月突然暴紅,2014年2月,《Flappy Bird》被開發者本人從蘋果及谷歌應用商店撤下,2014年8月份正式回歸APP STORE,正式加入Flappy迷們期待已久的多人對戰模式,游戲中玩家必須控制一只小鳥,跨越由各種不同長度水管所組成的障礙,

簡而言之,這是一款既簡單又困難的游戲,游戲的操作方式很簡單,但是想要獲得非常高的分數還是一件很有挑戰性的任務,如果讓人類來獲得一個比較高的分數,這幾乎是不可能的事情,但是使用DQN來玩FlappyBird并通過上百萬次的訓練,拿到一個較高的分數甚至不死還是可以實作的,
為此,本人借助FlappyBird的原始碼進行一定程度的改寫,簡化了游戲機制,小鳥死亡后會立即開始下一輪游戲,并用Tensorflow基于DQN來實作AI玩FlappyBird,
DQN簡介
Deep Q-Learning(DQN),通過在探索的程序中訓練網路,最后所達到的目標就是將當前狀態輸入,得到的輸出就是對應它的動作值函式,也即 f(s)=q(s,a),這個f就是訓練的網路,
DQN有兩個特性,Frozen Target Network和Experience Replay,大體框架可以理解為下圖所示:

在EvaluationNet中進行訓練,每進行多次訓練以后,將訓練后的權值等引數賦給TargetNet,所以在搭建targetNet網路時,不需要計算Loss和考慮Train程序,在EvaluationNet中的Loss計算方法為

我們在學習的程序中,會設定一個Memory空間,這個空間會記錄好每一次的MDP程序,也即 <s,a,r,s’> .在一開始時,Memory會先收集記錄,當記錄達到一定數量時,開始學習,每次從memory中隨機選擇一個適當的大小記憶塊,這些記憶塊中包含了經驗(experience)也即MDP程序,并且是隨機選擇的,所以解決了記錄相關性的問題,將這些經驗中的s作為輸入,傳入到EvaluationNet計算出q_evaluation,將s’傳入TargetNet得到q_next,之后再將EvaluationNet的引數賦給TargetNet,賦值完成以后,通過q_next來計算下一步的最大動作值,從而計算Loss,繼而優化EvaluationNet,
- DQN偽代碼

- DQN代碼翻譯與分析
初始化記憶體D中的記憶N
初始化隨機權重θaction值的函式Q(Q估計)
初始化權重θ-=θ target-action值的函式^Q(Q現實)
回圈:
初始化第一個場景s1=x1并且預處理場景s1對應的場景處理函式Φ
回圈:
根據可能性ε選擇一個隨機動作at,or
或者選擇一個最大值at從函式Q中在場景st下
執行動作a在模擬器中并且獲取一個獎勵rt和下一個場景xt+1
令st+1=st,at,xt+1并且處理Φt+1=Φ(st+1)
將(Φt,at,rt,Φt+1)存盤在D中
采樣一個隨機的小批訓練在D中
設定yj值:
如果 下一個場景yj+1是中止:則只回傳rj
否則 回傳rj+ (gamma ^Q(Φj+1,a,θ)函式最大a值的值)
#思路還是和Q-learning一樣,如果有獎勵則激勵權重θ,如果每獎勵則由gamma值來衰減權重θ
執行一個(Q現實-Q估計)平方梯度回歸來更新權重θ
每執行多少步就執行一個^Q=Q(Q現實=Q估計,主要是權重拷貝)
專案源代碼
游戲本體
FlappyBird的游戲本體的代碼已經撰寫好了,是現成的,這里作者只提供游戲本體的源代碼,下載即可,無需做過多的解釋,
1.首先下載(提取碼:BigG)所需的游戲本體等檔案夾,然后將下載好的五個檔案夾(assets、game、images、logs_bird、saved_networks)放到你的專案目錄下,并確保這些檔案夾和你的py原始碼檔案是在同一目錄下,
2.下載的檔案夾中有一個名為saved_networks,這里保存著已經訓練好的資料(訓練次數為292萬次),如果你想體驗從零開始,也可以清空這個檔案夾里面的資料只保留檔案夾本身,
(PS:博主采用的VS2019開發環境,Python為3.8版本)
FlappyBird.py
上述作業完成后,就可以撰寫源代碼來實作DQN玩FlappyBird了,程式所依靠的各類第三方庫需要提前準備好,不會安裝的可以自行百度pip命令來安裝,
#!/usr/bin/env python
from __future__ import print_function
#import tensorflow as tf
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import cv2
import sys
sys.path.append("game/")
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
GAME = 'bird' #日志檔案的名字
ACTIONS = 2 #有效運算元
GAMMA = 0.99 #衰減率
OBSERVE = 100000. #前OBSERVE輪次,不對網路進行訓練,只收集資料存到記憶庫中
#第OBSERVE到OBSERVE+EXPLORE輪次中,對網路進行訓練,且對epsilon進行退火,逐漸減小epsilon至FINAL_EPSILON
#當到達EXPLORE輪次時,epsilon達到最終值FINAL_EPSILON,不再對其進行更新
EXPLORE = 2000000. #上限
FINAL_EPSILON = 0.0001 #EPSILON的最終值
INITIAL_EPSILON = 0.0001 #EPSILON的初始值
REPLAY_MEMORY = 50000 #記憶庫
BATCH = 32 #訓練批次
FRAME_PER_ACTION = 1 #每隔FRAME_PER_ACTION輪次,就會有epsilon的概率進行探索
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev = 0.01)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.01, shape = shape)
return tf.Variable(initial)
def conv2d(x, W, stride):
return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")
def createNetwork():
#定義深度神經網路的引數和配置
W_conv1 = weight_variable([8, 8, 4, 32])
b_conv1 = bias_variable([32])
W_conv2 = weight_variable([4, 4, 32, 64])
b_conv2 = bias_variable([64])
W_conv3 = weight_variable([3, 3, 64, 64])
b_conv3 = bias_variable([64])
W_fc1 = weight_variable([1600, 512])
b_fc1 = bias_variable([512])
W_fc2 = weight_variable([512, ACTIONS])
b_fc2 = bias_variable([ACTIONS])
#輸入層
s = tf.placeholder("float", [None, 80, 80, 4])
#隱藏層
h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
#h_pool2 = max_pool_2x2(h_conv2)
h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)
#h_pool3 = max_pool_2x2(h_conv3)
#h_pool3_flat = tf.reshape(h_pool3, [-1, 256])
h_conv3_flat = tf.reshape(h_conv3, [-1, 1600])
h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1)
#輸出層
readout = tf.matmul(h_fc1, W_fc2) + b_fc2
return s, readout, h_fc1
def trainNetwork(s, readout, h_fc1, sess):
#定義損失函式
a = tf.placeholder("float", [None, ACTIONS])
y = tf.placeholder("float", [None])
readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices=1)
cost = tf.reduce_mean(tf.square(y - readout_action))
train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)
#開啟游戲模擬器,打開一個模擬器的視窗,實時顯示游戲的資訊
game_state = game.GameState()
#創建一個雙端佇列存放replay memory
D = deque()
#寫入檔案
a_file = open("logs_" + GAME + "/readout.txt", 'w')
h_file = open("logs_" + GAME + "/hidden.txt", 'w')
#設定游戲的初始狀態,設定動作為不執行跳躍,修改初始狀態為80*80*4大小
do_nothing = np.zeros(ACTIONS)
do_nothing[0] = 1
x_t, r_0, terminal = game_state.frame_step(do_nothing)
x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
#加載或保存網路引數
saver = tf.train.Saver()
sess.run(tf.initialize_all_variables())
checkpoint = tf.train.get_checkpoint_state("saved_networks")
if checkpoint and checkpoint.model_checkpoint_path:
saver.restore(sess, checkpoint.model_checkpoint_path)
print("Successfully loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old network weights")
#開始訓練
epsilon = INITIAL_EPSILON
t = 0
while "flappy bird" != "angry bird":
#使用epsilon貪心策略選擇一個動作
readout_t = readout.eval(feed_dict={s : [s_t]})[0]
a_t = np.zeros([ACTIONS])
action_index = 0
if t % FRAME_PER_ACTION == 0:
#執行一個隨即動作
if random.random() <= epsilon:
print("----------Random Action----------")
action_index = random.randrange(ACTIONS)
a_t[random.randrange(ACTIONS)] = 1
#由神經網路計算的Q(s,a)值選擇對應的動作
else:
action_index = np.argmax(readout_t)
a_t[action_index] = 1
else:
a_t[0] = 1 #不執行跳躍動作
#隨著游戲的進行,不斷降低epsilon,減少隨即動作
if epsilon > FINAL_EPSILON and t > OBSERVE:
epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
#執行選擇的動作,并獲得下一狀態及回報
x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 = np.reshape(x_t1, (80, 80, 1))
#s_t1 = np.append(x_t1, s_t[:,:,1:], axis = 2)
s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
#將狀態轉移程序存盤到D中,用于更新引數時采樣
D.append((s_t, a_t, r_t, s_t1, terminal))
if len(D) > REPLAY_MEMORY:
D.popleft()
#過了觀察期,才會進行網路引數的更新
if t > OBSERVE:
#從D中隨機采樣,用于引數更新
minibatch = random.sample(D, BATCH)
#分別將當前狀態、采取的動作、獲得的回報、下一狀態分組存放
s_j_batch = [d[0] for d in minibatch]
a_batch = [d[1] for d in minibatch]
r_batch = [d[2] for d in minibatch]
s_j1_batch = [d[3] for d in minibatch]
#計算Q(s,a)的新值
y_batch = []
readout_j1_batch = readout.eval(feed_dict = {s : s_j1_batch})
for i in range(0, len(minibatch)):
terminal = minibatch[i][4]
#如果游戲結束,則只有反饋值
if terminal:
y_batch.append(r_batch[i])
else:
y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))
#使用梯度下降更新網路引數
train_step.run(feed_dict = {
y : y_batch,
a : a_batch,
s : s_j_batch}
)
#狀態發生改變,用于下次回圈
s_t = s_t1
t += 1
#每進行10000次迭代,保留一下網路引數
if t % 10000 == 0:
saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step = t)
#列印游戲資訊
state = ""
if t <= OBSERVE:
state = "observe"
elif t > OBSERVE and t <= OBSERVE + EXPLORE:
state = "explore"
else:
state = "train"
print("TIMESTEP", t, "/ STATE", state, \
"/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \
"/ Q_MAX %e" % np.max(readout_t))
#寫入檔案
'''
if t % 10000 <= 100:
a_file.write(",".join([str(x) for x in readout_t]) + '\n')
h_file.write(",".join([str(x) for x in h_fc1.eval(feed_dict={s:[s_t]})[0]]) + '\n')
cv2.imwrite("logs_tetris/frame" + str(t) + ".png", x_t1)
'''
def playGame():
sess = tf.InteractiveSession()
s, readout, h_fc1 = createNetwork()
trainNetwork(s, readout, h_fc1, sess)
def main():
playGame()
main()
訓練結果
因為CSDN好像不能直接上傳視頻,只能把視頻轉換為gif了,湊乎看吧~
5萬次
PS:這只笨鳥只會一直往上飛

10萬次
PS:10萬次后,似乎略有進步,不會一直總是往上飛

20萬次
PS:有了大致的方向,嘗試越過第一個柱子

30萬次
PS:基本可以正確地找到第一個柱子間隙的方位并嘗試越過

40萬次
PS:已經有很高的幾率過第一個柱子,并且有一定幾率過第二個柱子

50萬次
PS:過多個柱子的幾率更高了

100萬次
PS:已經達到了普通玩家的正常水平,能順利通過5~8個柱子

200萬次
PS:幾乎無敵了,失敗是極低概率才會發生

(更高次數的訓練請等待作者更新…)
注:本文是博主機器學習實體的總結,不支持任何商用,轉載請注明出處!如果你也對機器學習有一定的興趣和理解,歡迎隨時找博主交流~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/258647.html
標籤:AI
