In [None]:
import gym

import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import json
import numpy as np
import scipy as sp
import scipy.stats as st
import scipy.integrate as integrate
from scipy.stats import multivariate_normal
from sklearn import linear_model
from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
import statsmodels.api as sm
from matplotlib.colors import LogNorm

sns.set_style("whitegrid")
sns.set_palette("colorblind")
palette = sns.color_palette()
figsize = (15,8)
legend_fontsize = 16

from matplotlib import rc
rc('font',**{'family':'sans-serif'})
rc('text', usetex=True)
rc('text.latex',preamble=r'\usepackage[utf8]{inputenc}')
rc('text.latex',preamble=r'\usepackage[russian]{babel}')
rc('figure', **{'dpi': 300})

## OpenAI Gym

In [None]:
from gym import envs
print("\n".join(["%s" % x for x in envs.registry.all()]))

In [None]:
env = gym.make('FrozenLake-v0')
env.reset()
for _ in range(5):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

In [None]:
env.env.P

## Policy iteration по уравнениям Беллмана

In [None]:
nS, nA = env.env.nS, env.env.nA
final_states = np.where([ len(env.env.P[x][0]) == 1 and env.env.P[x][0][0][3] == True for x in env.env.P.keys() ])[0]

def get_random_V(env):
    V = np.random.random(nS)
    V[final_states] = 0.0
    return V

def get_random_Q(env):
    Q = np.random.random(size=(nS, nA))
    Q[final_states, :] = 0.0
    return Q

In [None]:
def compute_V_by_policy(env, pi, gamma=1.0):
    V = get_random_V(env)
    while True:
        new_V = np.array([ \
          np.sum([ x[0] * ( x[2] + gamma * V[x[1]] ) for x in env.env.P[cur_state][pi[cur_state]] ]) \
        for cur_state in range(nS) ])
        if np.sum((V - new_V) ** 2) < 0.001:
            break
        V = new_V
    return V

def compute_policy_by_V(env, V, gamma=1.0):
    return np.argmax( np.array([[ \
          np.sum([ x[0] * ( x[2] + gamma * V[x[1]] ) for x in env.env.P[s][a] ]) \
        for a in range(nA) ] for s in range(nS)]), axis=1 )

In [None]:
def compute_V_and_pi(env, gamma=1.0):
    V = get_random_V(env)
    pi = np.random.randint(nA, size=nS)

    while True:
        V = compute_V_by_policy(env, pi, gamma)
        new_pi = compute_policy_by_V(env, V, gamma)
        if np.array_equal(pi, new_pi):
            break
        pi = new_pi
    
    return V, pi

In [None]:
env = gym.make('FrozenLake-v0')
env._max_episode_steps = 10000

num_experiments = 200
num_steps, total_reward = [], []

V, pi = compute_V_and_pi(env)

for _ in range(num_experiments):
    env.reset()
    total_reward.append(0)
    for step in range(1000):
        observation, reward, done, info = env.step(pi[env.env.s])
        total_reward[-1] += reward
        if done:
            num_steps.append(step+1)
            print("Эпизод закончился за %d шагов в состоянии %s, общая награда %d" % (num_steps[-1], env.env.s, total_reward[-1]) )
            break
env.close()

print("\nСредняя награда: %.6f\nСреднее число шагов: %.6f" % (np.mean(total_reward), np.mean(num_steps)))

In [None]:
def conduct_experiments_pi(env, pi, num_experiments=1000):
    num_steps, total_reward = [], []
    for _ in range(num_experiments):
        env.reset()
        num_steps.append(0)
        total_reward.append(0)
        for _ in range(1000):
            observation, reward, done, info = env.step(pi[env.env.s])
            total_reward[-1] += reward
            num_steps[-1] += 1
            if done:
                break
    env.close()
    return np.mean(total_reward), np.mean(num_steps)

def conduct_experiments(env, gamma=1.0, num_experiments=100, num_experiments_pi=10):
    num_steps, total_reward = [], []
    for _ in range(num_experiments):
        V, pi = compute_V_and_pi(env, gamma=gamma)
        cur_steps, cur_reward = conduct_experiments_pi(env, pi, num_experiments=num_experiments_pi)
        num_steps.append(cur_steps)
        total_reward.append(cur_reward)
    return np.mean(num_steps), np.mean(total_reward)

In [None]:
env = gym.make('FrozenLake-v0')
env._max_episode_steps = 10000
results = []
for gamma in np.linspace(0.5, 1.0, 10):
    mean_reward, mean_steps = conduct_experiments(env, gamma, num_experiments=100, num_experiments_pi=10)
    results.append([gamma, mean_reward, mean_steps])
    print("gamma=%.4f, mean reward = %.4f, mean steps = %.4f" % (gamma, mean_reward, mean_steps) )
env.close()

In [None]:
def plot_results(results):
    gammas, rewards, numsteps = [x[0] for x in results], [x[1] for x in results], [x[2] for x in results]
    fig, ax = plt.subplots(1, 1, figsize=(12, 6))
    ax2 = ax.twinx()
    ax2.grid(None)

    line1 = ax.plot(gammas, rewards, label="Средние награды", color="C0")
    line2 = ax2.plot(gammas, numsteps, label="Среднее число шагов", color="C1")

    lines = line1 + line2
    labels = [l.get_label() for l in lines]
    ax.legend(lines, labels, loc=2)
    ax.set_xlim((0.5, 1.0))
    # ax.set_ylim((0.1, 0.8))
    # ax2.set_ylim((10, 45))
    return fig, ax

fig, ax = plot_results(results)

## Value iteration по уравнениям Беллмана

In [None]:
def compute_V_max(env, gamma=1.0):
    V = get_random_V(env)
    while True:
        new_V = np.array([ [ \
          np.sum([ x[0] * ( x[2] + gamma * V[x[1]] ) for x in env.env.P[cur_state][cur_action] ]) \
        for cur_action in range(nA) ] for cur_state in range(nS) ])
        new_V = np.max(new_V, axis=1)
        if np.sum((V - new_V) ** 2) < 0.001:
            break
        V = new_V
    return V

def compute_Q_max(env, gamma=1.0):
    Q = get_random_Q(env)
    while True:
        new_Q = np.array([ [ \
          np.sum([ x[0] * ( x[2] + gamma * np.max(Q[x[1], :]) ) for x in env.env.P[cur_state][cur_action] ]) \
        for cur_action in range(nA) ] for cur_state in range(nS) ])
#         new_V = np.max(new_V, axis=1)
        if np.sum((Q - new_Q) ** 2) < 0.001:
            break
        Q = new_Q
    return Q

def compute_policy_by_Q(env, Q, gamma=1.0):
    return np.argmax( Q, axis=1 )

def conduct_experiments_max(env, gamma, use_Q=False, num_experiments=100, num_experiments_pi=200):
    num_steps, total_reward = [], []
    for _ in range(num_experiments):
        if use_Q:
            Q = compute_Q_max(env, gamma=gamma)
            pi = compute_policy_by_Q(env, Q)
        else:
            V = compute_V_max(env, gamma=gamma)
            pi = compute_policy_by_V(env, V)
        result = conduct_experiments_pi(env, pi, num_experiments=num_experiments_pi)
        num_steps.append(result[0])
        total_reward.append(result[1])
    return np.mean(num_steps), np.mean(total_reward)

In [None]:
env = gym.make('FrozenLake-v0')
env._max_episode_steps = 10000

V = compute_V_max(env)
pi = compute_policy_by_V(env, V, gamma=0.2)
print(pi)

env.close()

In [None]:
env = gym.make('FrozenLake-v0')
env._max_episode_steps = 10000

Q = compute_Q_max(env)
pi = compute_policy_by_Q(env, Q, gamma=0.2)
print(pi)

env.close()

In [None]:
env = gym.make('FrozenLake-v0')
env._max_episode_steps = 10000

results_max = []
for gamma in np.linspace(0.5, 1.0, 20):
    mean_reward, mean_steps = conduct_experiments_max(env, gamma, use_Q=True, num_experiments=20, num_experiments_pi=100)
    results_max.append([gamma, mean_reward, mean_steps])
    print("gamma=%.4f, mean reward = %.4f, mean steps = %.4f" % (gamma, mean_reward, mean_steps) )
env.close()

In [None]:
fig, ax = plot_results(results_max)