当前位置: 首页 > 最新文章 > 正文

「强化学习」Flappy Bird 代码解析

Agent Environment Action Reward Status 代码逻辑结构1 初始化游戏图像数据,将图像转化为80* 80*4 的矩阵Status即s_t # 初始化 # 将图像转化为80*80*4 的矩阵 do_nothing = np.zeros(ACTIONS) do_nothing[0] = 1 x_t, r_0, terminal = gam

admin

Agent Environment Action Reward Status 代码逻辑结构1 初始化游戏图像数据,将图像转化为80* 80*4 的矩阵Status即s_t # 初始化 # 将图像转化为80*80*4 的矩阵 do_nothing = np.zeros(ACTIONS) do_nothing[0] = 1 x_t, r_0, terminal = game_state.frame_step(do_nothing) # 将图像转换成80*80,并进行灰度化 x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80

「强化学习」Flappy Bird 代码解析

  • Agent
  • Environment
  • Action
  • Reward
  • Status

代码逻辑结构

1 初始化游戏图像数据,将图像转化为80* 80*4 的矩阵Status即s_t

    # 初始化    # 将图像转化为80*80*4 的矩阵    do_nothing = np.zeros(ACTIONS)    do_nothing[0] = 1    x_t, r_0, terminal = game_state.frame_step(do_nothing)    # 将图像转换成80*80,并进行灰度化    x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)    # 对图像进行二值化    ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)    # 将图像处理成4通道    s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)

第一阶段循环开始
2 将Status即s_t输入到Agent即CNN网络中得到分析结构(二分类),并由分析结果readout _t通过得到Action即a _t

        # 将当前环境输入到CNN网络中        readout_t = readout.eval(feed_dict={s: [s_t]})[0]        a_t = np.zeros([ACTIONS])        action_index = 0        if t % FRAME_PER_ACTION == 0:            if random.random() <= epsilon:                print("----------Random Action----------")                action_index = random.randrange(ACTIONS)                a_t[random.randrange(ACTIONS)] = 1            else:                action_index = np.argmax(readout_t)                a_t[action_index] = 1        else:            a_t[0] = 1  # do nothing

3 将Action即a _t输入到Environment即game _state游戏中,得到Reward即r _t和s _t1和terminal

        # 其次,执行选择的动作,并保存返回的状态、得分。        x_t1_colored, r_t, terminal = game_state.frame_step(a_t)        x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)        ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)        x_t1 = np.reshape(x_t1, (80, 80, 1))        s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)

4 将这些经验数据进行保存

D.append((s_t, a_t, r_t, s_t1, terminal))

这是前10000次的循环,在通过分析结果readout
_t得到Action的过程中,加入随机因素,使得Agent有一定的概率进行随机选择Action. 而且前面的循环是没有强化过程的步骤的,就是要积累数据

    # 缩小 epsilon    if epsilon > FINAL_EPSILON and t > OBSERVE:        epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

后面的循环,随着循环的进步,不断Agent随机选择Action的概率。 开始循环开始才有强化过程

第二阶段循环开始
2 将Status即s_t输入到Agent即CNN网络中得到分析结构(二分类),并由分析结果readout _t通过得到Action即a _t
3 将Action即a _t输入到Environment即game _state游戏中,得到Reward即r _t和s _t1和terminal
4 将这些经验数据进行保存D.append((s_t, a_t, r_t, s_t1, terminal))
5 从D中抽取一定数量BATCH的经验数据

minibatch = random.sample(D, BATCH) # 从经验池D中随机提取马尔科夫序列 s_j_batch = [d[0] for d in minibatch] a_batch = [d[1] for d in minibatch] r_batch = [d[2] for d in minibatch] s_j1_batch = [d[3] for d in minibatch]

6 此处是关键所在,y_batch表示标签值,如果下一时刻游戏关闭则直接用奖励做标签值,若游戏没有关闭,则要在奖励的基础上加上GAMMA比例的下一时刻最大的模型预测值

y_batch = []readout_j1_batch = readout.eval(feed_dict={s: s_j1_batch})for i in range(0, len(minibatch)):terminal = minibatch[i][4]# if terminal, only equals rewardif terminal:  y_batch.append(r_batch[i])else:y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))

7 强化学习过程,此处采用了梯度下降对整个预测值进行收敛,通过对标签值与当前模型预估行动的差值进行分析

    a = tf.placeholder("float", [None, ACTIONS])    y = tf.placeholder("float", [None])    readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices=1)    cost = tf.reduce_mean(tf.square(y - readout_action))    train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)    # perform gradient step    train_step.run(feed_dict={        y: y_batch,        a: a_batch,        s: s_j_batch}    )

综上 ,这个模型的主要框架即是如此。

源代码如下

#!/usr/bin/env pythonfrom __future__ import print_functionimport tensorflow as tfimport cv2import syssys.path.append("game/")import wrapped_flappy_bird as gameimport randomimport numpy as npfrom collections import dequeGAME = 'bird'  # the name of the game being played for log filesACTIONS = 2  # number of valid actionsGAMMA = 0.99  # decay rate of past observationsOBSERVE = 10000.  # timesteps to observe before trainingEXPLORE = 2000000.  # frames over which to anneal epsilonFINAL_EPSILON = 0.0001  # final value of epsilonINITIAL_EPSILON = 0.0001  # starting value of epsilonREPLAY_MEMORY = 50000  # number of previous transitions to rememberBATCH = 32  # size of minibatchFRAME_PER_ACTION = 1# CNN 模型# 权重def weight_variable(shape):    initial = tf.truncated_normal(shape, stddev=0.01)    return tf.Variable(initial)# 偏置def bias_variable(shape):    initial = tf.constant(0.01, shape=shape)    return tf.Variable(initial)# 卷积函数def conv2d(x, W, stride):    return tf.nn.conv2d(x, W, strides=[1, stride, stride, 1], padding="SAME")# 池化 核 2*2 步长2def max_pool_2x2(x):    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")# 创建网络def createNetwork():    # network weights    W_conv1 = weight_variable([8, 8, 4, 32])    b_conv1 = bias_variable([32])    W_conv2 = weight_variable([4, 4, 32, 64])    b_conv2 = bias_variable([64])    W_conv3 = weight_variable([3, 3, 64, 64])    b_conv3 = bias_variable([64])    W_fc1 = weight_variable([1600, 512])    b_fc1 = bias_variable([512])    W_fc2 = weight_variable([512, ACTIONS])    b_fc2 = bias_variable([ACTIONS])    # 输入层 输入向量为80*80*4    # input layer    s = tf.placeholder("float", [None, 80, 80, 4])    # hidden layers    # 第一个隐藏层+一个池化层    h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)    h_pool1 = max_pool_2x2(h_conv1)    # 第二个隐藏层    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)    # h_pool2 = max_pool_2x2(h_conv2)    # 第三个隐藏层    h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)    # h_pool3 = max_pool_2x2(h_conv3)    # 展平    # h_pool3_flat = tf.reshape(h_pool3, [-1, 256])    h_conv3_flat = tf.reshape(h_conv3, [-1, 1600])    # 第一个全连接层    h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1)    # 输出层    # readout layer    readout = tf.matmul(h_fc1, W_fc2) + b_fc2    return s, readout, h_fc1def trainNetwork(s, readout, h_fc1, sess):    # 定义损失函数    # define the cost function    a = tf.placeholder("float", [None, ACTIONS])    y = tf.placeholder("float", [None])    readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices=1)    cost = tf.reduce_mean(tf.square(y - readout_action))    train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)    # open up a game state to communicate with emulator    game_state = game.GameState()    # store the previous observations in replay memory    D = deque()    # printing    a_file = open("logs_" + GAME + "/readout.txt", 'w')    h_file = open("logs_" + GAME + "/hidden.txt", 'w')    # 初始化    # 将图像转化为80*80*4 的矩阵    do_nothing = np.zeros(ACTIONS)    do_nothing[0] = 1    x_t, r_0, terminal = game_state.frame_step(do_nothing)    # 将图像转换成80*80,并进行灰度化    x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)    # 对图像进行二值化    ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)    # 将图像处理成4通道    s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)    # 保存和载入网络    saver = tf.train.Saver()    sess.run(tf.initialize_all_variables())    checkpoint = tf.train.get_checkpoint_state("saved_networks")    if checkpoint and checkpoint.model_checkpoint_path:        saver.restore(sess, checkpoint.model_checkpoint_path)        print("Successfully loaded:", checkpoint.model_checkpoint_path)    else:        print("Could not find old network weights")    # 开始训练    epsilon = INITIAL_EPSILON    t = 0    while "flappy bird" != "angry bird":        # choose an action epsilon greedily        # 将当前环境输入到CNN网络中        readout_t = readout.eval(feed_dict={s: [s_t]})[0]        a_t = np.zeros([ACTIONS])        action_index = 0        if t % FRAME_PER_ACTION == 0:            if random.random() <= epsilon:                print("----------Random Action----------")                action_index = random.randrange(ACTIONS)                a_t[random.randrange(ACTIONS)] = 1            else:                action_index = np.argmax(readout_t)                a_t[action_index] = 1        else:            a_t[0] = 1  # do nothing        # scale down epsilon        # 缩小 epsilon        if epsilon > FINAL_EPSILON and t > OBSERVE:            epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE        # 其次,执行选择的动作,并保存返回的状态、得分。        x_t1_colored, r_t, terminal = game_state.frame_step(a_t)        x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)        ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)        x_t1 = np.reshape(x_t1, (80, 80, 1))        s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)        # 经验池保存的是以一个马尔科夫序列于D中        D.append((s_t, a_t, r_t, s_t1, terminal))        # (s_t, a_t, r_t, s_t1, terminal)分别表示        # t时的状态s_t,        # 执行的动作a_t,        # 得到的反馈r_t,        # 得到的下一步的状态s_t1        # 游戏是否结束的标志terminal        # 如果经验池超过最大长度 则弹出最早的经验数据        if len(D) > REPLAY_MEMORY:            D.popleft()        # 过了一段时间之后,t 是计数器        if t > OBSERVE:            minibatch = random.sample(D, BATCH)            # 从经验池D中随机提取马尔科夫序列            s_j_batch = [d[0] for d in minibatch]            a_batch = [d[1] for d in minibatch]            r_batch = [d[2] for d in minibatch]            s_j1_batch = [d[3] for d in minibatch]            y_batch = []            readout_j1_batch = readout.eval(feed_dict={s: s_j1_batch})            for i in range(0, len(minibatch)):                terminal = minibatch[i][4]                if terminal:                    y_batch.append(r_batch[i])                else:                    y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))            train_step.run(feed_dict={                y: y_batch,                a: a_batch,                s: s_j_batch}            )        s_t = s_t1        t += 1        # save progress every 10000 iterations        if t % 10000 == 0:            saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step=t)        # print info        state = ""        if t <= OBSERVE:            state = "observe"        elif t > OBSERVE and t <= OBSERVE + EXPLORE:            state = "explore"        else:            state = "train"        print("TIMESTEP", t, "/ STATE", state, \              "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \              "/ Q_MAX %e" % np.max(readout_t))        # write info to files        '''        if t % 10000 <= 100:            a_file.write(",".join([str(x) for x in readout_t]) + '\n')            h_file.write(",".join([str(x) for x in h_fc1.eval(feed_dict={s:[s_t]})[0]]) + '\n')            cv2.imwrite("logs_tetris/frame" + str(t) + ".png", x_t1)        '''def playGame():    sess = tf.InteractiveSession()    s, readout, h_fc1 = createNetwork()    trainNetwork(s, readout, h_fc1, sess)def main():    playGame()if __name__ == "__main__":    main()


上一篇: 括号的用法有哪些?4种括号的用法分享给大家,帮助大家全面了解 下一篇:烘焙百分比算法,用数学与逻辑找到开启烘焙大门的这把“小钥匙”
返回顶部