In this article we will play with the problem of the mountain car. So first, what is Q Learning ? Q learning is a form of learning through an agent doing stuff in an environment without using a model, there are no parameters to train, features and so on. The goal is that for a given state of the environment our agent will have a set of choices available to him and he will chose the one which brings the maximum reward for his action.

We are going to play around with OpenAI’s gym and specifically with the Mountain car. To get gym : pip install gym.

The list of choices available to our cart is :

  • 0 : push left
  • 1 : do nothing
  • 2: push right

The goal of our cart will be reach the little flag on the right and he will need to only use the commands above to reach it in less than 200 steps. If he goes above 200 steps the game just closes.

The flag is on the right so let’s start with an easy script that will tell to the car to always go right, maybe it will manage to climb the mountain ?

import gym

#initialize environment
env = gym.make("MountainCar-v0")
env.reset()

#do an action in a loop
done = False
while not done:
	#make the cart go right
    action = 2  
    env.step(action)
    env.render()

As you can see the cart cannot climb the slope as is, he will need to go right then go left to gain momentum and then right. Our agent has to figure out when to press left, when to press right and when to do nothing in order to reach the flag in less than 200 steps. Easy right ?

To achieve this our cart will have to use the 3 actions that we have identified above (action space) and he will need an “observation space” that will define the current state of the environment.

By performing a step we can retrieve a few information from the environment :

import gym

#initialize environment
env = gym.make("MountainCar-v0")
env.reset()

#do an action in a loop
done = False
while not done:
	#make the cart go right
    action = 2  
    new_state, reward, done, _ = env.step(action)
    print(reward, new_state)

The output is :

-1.0 [-0.55547931  0.00124806]
-1.0 [-0.55299252  0.0024868 ]
-1.0 [-0.54928555  0.00370696]
-1.0 [-0.54438613  0.00489942]
-1.0 [-0.5383309   0.00605523]
-1.0 [-0.53116522  0.00716568]
-1.0 [-0.5229428   0.00822243]
-1.0 [-0.51372529  0.00921751]
-1.0 [-0.50358183  0.01014347]
-1.0 [-0.4925884   0.01099343]
-1.0 [-0.48082721  0.01176119]
-1.0 [-0.46838593  0.01244128]
-1.0 [-0.45535686  0.01302907]
-1.0 [-0.44183603  0.01352083]

The first number is the reward which does not change at the moment.

The second element is the new_state, the first value is the position on the X axis (horizontal) and the second is the velocity. This 2 values are what we can get to give the state of the environment, those are the 2 observations elements of our space.

Now we will use a Qtable to “model” a Q value for each action for each state. So for each combination of position X and velocity we will have 3 different values possible (left, nothing and right) and a Q value associated to those.

By printing the maximum and minimum values of the X position and the velocity we can have an idea about the range of values for our observation space :

print(env.observation_space.high)
print(env.observation_space.low)

#Output :
#[0.6  0.07]
#[-1.2  -0.07]

Now, if we use the 8 decimals for each combination between 0.6 and -1.2 for X axis and 0.07 -> -0.07 for velocity we will have quite a lot of combinations in our Q table, way too much to be honest. We don’t need to have so much values so we will rather group the values into discrete values using buckets, let’s go with 30 buckets.

By using the script below we will determine how much do we need to increment our values to create the 30 buckets :

DISCRETE_OS_SIZE = [30, 30]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE


print(discrete_os_win_size)

#[0.06       0.00466667]

so we will have to increment each record by 0.06 for X axis and 0.00466667 for velocity. Alright.

Let’s now build our Q table :

import gym
import numpy as np

#initialize environment
env = gym.make("MountainCar-v0")
env.reset()


DISCRETE_OS_SIZE = [30, 30]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE
print(discrete_os_win_size)

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))

print(q_table.shape)
#do an action in a loop
done = False
while not done:
	#make the cart go right
    action = 2  
    new_state, reward, done, _ = env.step(action)
    #print(reward, new_state)

This will build our (30, 30, 3) table, so the 30 * 30 is every combination of our 30 slices of all possible states for our cart (position X and velocity) and the *3 is our 3 possible actions with a value of Q attached. We have randomized the value of Q so far between -2 and 0. Each step is a -1 reward and the flag is a 0 so it makes sense to initialize all random Q values as negative.

This table will be our guideline for the cart, for every state our cart will chose the best action to take based on the max Q value associated with one of the actions. But how to explore and how to populate the table with real Q values ? We need to put randomness and explore different actions and update the Q value along the way ! How ? with this formula :

And this is the formula with less math :

new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

We need now to update the Q-Values (value per possible action per unique state).

First let’s discuss about the discount : the discount measures how much we want to care about FUTURE reward rather than immediate reward. In general this value will be fairly high and between 0 and 1. It is desirable to have it high because the principle of Q learning is to learn a chain of events that ends with the desirable outcome (to reach the flag in this example). So it is pretty natural to put greater importance on long term than rather short term gains.

The max_future_q is calculated after we’ve performed our action (or “step”), then we update our previous values based partially on the next-step’s best Q value. With time and once we’ve reached the objective at least once, this “reward” value gets slowly back-propagated (hello neural networks!), one step at a time, per episode and the more we go the more “optimal” will be the chain of actions attempted by our Agent.

We will also add a variable with the episodes, how many iterations of a game we want to run.

We will also add a small function in order to convert the state of the environment to our discrete buckets :

def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))

Next we will also replace our action. We had put action = 2 in order to always go right, but now we want to actually chose the best action for a given state and then grab the new discrete state

action = np.argmax(q_table[discrete_state])
new_discrete_state = get_discrete_state(new_state)

And finally we also want to update the Q-value :

    # If simulation did not end yet after last step - update Q table
    if not done:

        # Maximum possible Q value in next step (for new state)
        max_future_q = np.max(q_table[new_discrete_state])

        # Current Q value (for current state and performed action)
        current_q = q_table[discrete_state + (action,)]

        # And here's our equation for a new Q value for current state and action
        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # Update Q table with new Q value
        q_table[discrete_state + (action,)] = new_q

Full code of our Agent until this stage :

import gym
import numpy as np

env = gym.make("MountainCar-v0")

LEARNING_RATE = 0.1

DISCOUNT = 0.95
EPISODES = 25000

DISCRETE_OS_SIZE = [30, 30]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))


def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))  # we use this tuple to look up the 3 Q values for the available actions in the q-table


discrete_state = get_discrete_state(env.reset())
done = False
while not done:

    action = np.argmax(q_table[discrete_state])
    new_state, reward, done, _ = env.step(action)

    new_discrete_state = get_discrete_state(new_state)

    env.render()
    #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

    # If simulation did not end yet after last step - update Q table
    if not done:

        # Maximum possible Q value in next step (for new state)
        max_future_q = np.max(q_table[new_discrete_state])

        # Current Q value (for current state and performed action)
        current_q = q_table[discrete_state + (action,)]

        # And here's our equation for a new Q value for current state and action
        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # Update Q table with new Q value
        q_table[discrete_state + (action,)] = new_q


    # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
    elif new_state[0] >= env.goal_position:
        #q_table[discrete_state + (action,)] = reward
        q_table[discrete_state + (action,)] = 0

    discrete_state = new_discrete_state


env.close()

We then add a loop to go over the number of episodes that we specified in our EPISODES variable and we let our little script go over thousands of iteration of games. To display only some of the examples we will add a new variable that will pilot the interval between two displayed games.

Here is the loop :

for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False

    while not done:

        action = np.argmax(q_table[discrete_state])
        new_state, reward, done, _ = env.step(action)

        new_discrete_state = get_discrete_state(new_state)

        if episode % SHOW_EVERY == 0:
            env.render()

The problem is.. That at that moment our agent is not learning anything. The reason is that we have initialized randomly the table and the agent is simply choosing the biggest Q value and as such being “greedy” and not learning anything, those Q values are random and as such completely useless. That’s why we need some randomness in the movements of our agent in order to learn from those random moves, this is the only way to make it explore new paths and actually capitalize on those.

To this end we will use the epsilon value :

# Exploration settings
epsilon = 1  # not a constant, qoing to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES//2
epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)

We also want to decay the epsilon with time so that the moves are less and less random :

    ...
    # Decaying is being done every episode if episode number is within decaying range
    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value


env.close()

And finally we use epsilon in our actions :

    while not done:

        if np.random.random() > epsilon:
            # Get action from Q table
            action = np.argmax(q_table[discrete_state])
        else:
            # Get random action
            action = np.random.randint(0, env.action_space.n)

So we pick randomly a value between 0 and 1 and we check if it is bigger than epsilon. If yes we pick the next action based on the Q table and the discrete state that we are in. If not we pick a random action in the action space (0,1 or 2).

After each iteration if we are after the epsilon decaying start episode we will make it smaller and smaller and thus making our moves less and less random. let’s also add a small message when we actually reach the flag :

Here is the full code and let’s try it out :

import gym
import numpy as np

#Setting up the environment
env=gym.make("MountainCar-v0")

#All our variables and constants
#Model
LEARNING_RATE=0.1
DISCOUNT=0.95
#Number of episodes for our agent to go through
EPISODES=25000
#Show an example on the screen every 2000 episodes
SHOW_EVERY = 2000
#Our discrete bucketing of the observatrion space
DISCRETE_OS_SIZE=[30]*len(env.observation_space.high)
#Our increment for the bucketing
discrete_os_win_size=(env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE
#The epsilon to pilot the randomness of our actions to increase exploration to actually enable learning
epsilon = 1
#When do we start to decay epsilon in order to reduce randomness of the actions of our model
START_EPSILON_DECAYING = 1
#End of the decaying of our epsilon
END_EPSILON_DECAYING = EPISODES // 2
#Value of the devaying of our epsilon, by how much do we decrease epislon after each iteration 
epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING)
#Creation of our Q_table that will store all the different values of our environment space
q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))


#Quick helper-function that will convert our environment "state," which currently contains continuous 
#values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead:
def get_discrete_state(state):
	discrete_state = (state-env.observation_space.low)/discrete_os_win_size
	return tuple(discrete_state.astype(np.int))

#Looping code for each episode
for episode in range(EPISODES):
	#We print the episode number if we will show it as an example
	if episode%SHOW_EVERY ==0:
		print(episode)
		#Show it
		render=True
	else: 
		#not show it
		render=False

	#get the discrete_state
	discrete_state=get_discrete_state(env.reset())

	done=False
	while not done:
		#Selection of the action for our Cart 
		#Now we just need to use epsilon. We'll use np.random.random() to randomly pick a number 0 to 1. 
		#If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly:
		if np.random.random() > epsilon:
			action = np.argmax(q_table[discrete_state])
		else:
			action = np.random.randint(0, env.action_space.n)

		#We perform the step based on the action we determined in the previous lines of code and we collect the result
		new_state, reward, done, _ = env.step(action)
		#We have a new discrete state now
		new_discrete_state=get_discrete_state(new_state)
		#If we show the example :
		if render: 
			env.render()

		# If simulation did not end yet after last step - we update Q table
		if not done:
			#The max_future_q is grabbed after we've performed our action already, and then we update our previous values based 
			#partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets 
			#slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works!
			# Maximum possible Q value in next step (for new state)
			max_future_q=np.max(q_table[new_discrete_state])
			# Current Q value (for current state and performed action)
			current_q=q_table[discrete_state+(action, )]
			#THE FORMULA FOR ALL Q VALUES (we update the Q wqith the new value)
			# And here's our equation for a new Q value for current state and action
			new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
			# Update Q table with new Q value
			q_table[discrete_state+(action,)]=new_q

		 # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
		elif new_state[0] >= env.goal_position:
			#we display message if we arrived to the flag
			print(f"Flag reached on episode : {episode}")
			q_table[discrete_state + (action,)] = 0

		discrete_state=new_discrete_state

	# Decaying is being done every episode if episode number is within decaying range	
	if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
		epsilon -= epsilon_decay_value

#Once all episodes performed we close the environment
env.close()

Here is how it goes :

As we can see our little cart reaches the flag and as time passes it does it better and better which is great ! It took the cart more than 5800episodes to get there one time, by the episode 7000 it gets there pretty much every 2 or 3 tries. Later we get into the episode better are our Qvalues and less random is the choice of our agent making it more and more reliable. And this is what QLearning is about !

And here is a little video about it :

Next step would be to eventually display some metrics during the training.

So we add a new dictionary with a couple of additional metrics :

ep_rewards = []
aggr_ep_rewards = {'ep':[], 'avg' : [], 'min' : [], 'max' : []}

We then add in our loop the code to calculate the reward of our current episode and then we append it to the list of episode rewards. Once done we can display the metrics every X episodes using our average_reward dict.

episode_reward += reward

ep_rewards.append(episode_reward)

	if not episode%SHOW_EVERY:
		average_reward= sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:])
		aggr_ep_rewards['ep'].append(episode)
		aggr_ep_rewards['avg'].append(average_reward)
		aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:]))
		aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:]))

		print(f"Episode:{episode} avg: {average_reward} min: {min(ep_rewards[-SHOW_EVERY:])} max: {max(ep_rewards[-SHOW_EVERY:])}")

Finally, when we are all done with the training we can plot the average, the max and the min on every episode and get some nice indicators about how the training is going on :

plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="max rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="min rewards")
plt.legend(loc=4)
plt.show()

We also take out our “Flag reached” message in order to not spam the output.

We indeed have our metrics displayed during the training and when done :

Beautiful, our cart is learning and reaching the flag more and more as proven by the reward getting bigger and bigger !

The video :

Full code with the ploting as well :

import gym
import numpy as np
import matplotlib.pyplot as plt

#Setting up the environment
env=gym.make("MountainCar-v0")

#All our variables and constants
#Model
LEARNING_RATE=0.1
DISCOUNT=0.95
#Number of episodes for our agent to go through
EPISODES=10000
#Show an example on the screen every 2000 episodes
SHOW_EVERY = 500
#Our discrete bucketing of the observatrion space
DISCRETE_OS_SIZE=[30]*len(env.observation_space.high)
#Our increment for the bucketing
discrete_os_win_size=(env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE
#The epsilon to pilot the randomness of our actions to increase exploration to actually enable learning
epsilon = 1
#When do we start to decay epsilon in order to reduce randomness of the actions of our model
START_EPSILON_DECAYING = 1
#End of the decaying of our epsilon
END_EPSILON_DECAYING = EPISODES // 2
#Value of the devaying of our epsilon, by how much do we decrease epislon after each iteration 
epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING)
#Creation of our Q_table that will store all the different values of our environment space
q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))

#Additional objects for reporting purposes
ep_rewards = []
aggr_ep_rewards = {'ep':[], 'avg' : [], 'min' : [], 'max' : []}


#Quick helper-function that will convert our environment "state," which currently contains continuous 
#values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead:
def get_discrete_state(state):
	discrete_state = (state-env.observation_space.low)/discrete_os_win_size
	return tuple(discrete_state.astype(np.int))

#Looping code for each episode
for episode in range(EPISODES):
	#Initialisation of the reward for current episode
	episode_reward=0
	#We print the episode number if we will show it as an example
	if episode%SHOW_EVERY ==0:
		#Show it
		print(episode)
		render=True
	else:
		#not show it
		render=False
	#get the discrete_state
	discrete_state=get_discrete_state(env.reset())

	done=False

	while not done:
		#Selection of the action for our Cart 
		#Now we just need to use epsilon. We'll use np.random.random() to randomly pick a number 0 to 1. 
		#If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly:
		if np.random.random() > epsilon:
			action = np.argmax(q_table[discrete_state])
		else:
			action = np.random.randint(0, env.action_space.n)

		#We perform the step based on the action we determined in the previous lines of code and we collect the result
		new_state, reward, done, _ = env.step(action)
		#We increment the reward for this episode
		episode_reward += reward
		#We have a new discrete state now
		new_discrete_state=get_discrete_state(new_state)
		#If we show the example :		
		if render: 
			env.render()

		# If simulation did not end yet after last step - update Q table
		if not done:
			#The max_future_q is grabbed after we've performed our action already, and then we update our previous values based 
			#partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets 
			#slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works!
			# Maximum possible Q value in next step (for new state)
			max_future_q=np.max(q_table[new_discrete_state])
			# Current Q value (for current state and performed action)
			current_q=q_table[discrete_state+(action, )]
			#THE FORMULA FOR ALL Q VALUES (we update the Q wqith the new value)
			# And here's our equation for a new Q value for current state and action
			new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
			# Update Q table with new Q value
			q_table[discrete_state+(action,)]=new_q

		 # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
		elif new_state[0] >= env.goal_position:
			#We took off the print
			#print(f"Flag reached on episode : {episode}")
			q_table[discrete_state + (action,)] = 0

		discrete_state=new_discrete_state

	# Decaying is being done every episode if episode number is within decaying range	
	if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
		epsilon -= epsilon_decay_value

	#We append the reward of this episode to the list of rewards/episode
	ep_rewards.append(episode_reward)

	#If we show the metrics
	if not episode%SHOW_EVERY:
		#We calculate the average reward so far
		average_reward= sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:])
		#We append the metrics to our display object
		aggr_ep_rewards['ep'].append(episode)
		aggr_ep_rewards['avg'].append(average_reward)
		aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:]))
		aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:]))

		#We print it
		print(f"Episode:{episode} avg: {average_reward} min: {min(ep_rewards[-SHOW_EVERY:])} max: {max(ep_rewards[-SHOW_EVERY:])}")

#We close it once we finished to run all episodes
env.close()

#We plot our final metrics ! 
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="max rewards")
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="min rewards")
plt.legend(loc=4)
plt.show()

Brax

Dude in his 30s starting his digital notepad