In this article we will play with the problem of the mountain car. So first, what is Q Learning ? Q learning is a form of learning through an agent doing stuff in an environment without using a model, there are no parameters to train, features and so on. The goal is that for a given state of the environment our agent will have a set of choices available to him and he will chose the one which brings the maximum reward for his action.
We are going to play around with OpenAI’s gym and specifically with the Mountain car. To get gym : pip install gym.
The list of choices available to our cart is :
- 0 : push left
- 1 : do nothing
- 2: push right
The goal of our cart will be reach the little flag on the right and he will need to only use the commands above to reach it in less than 200 steps. If he goes above 200 steps the game just closes.
The flag is on the right so let’s start with an easy script that will tell to the car to always go right, maybe it will manage to climb the mountain ?
import gym #initialize environment env = gym.make("MountainCar-v0") env.reset() #do an action in a loop done = False while not done: #make the cart go right action = 2 env.step(action) env.render()
As you can see the cart cannot climb the slope as is, he will need to go right then go left to gain momentum and then right. Our agent has to figure out when to press left, when to press right and when to do nothing in order to reach the flag in less than 200 steps. Easy right ?
To achieve this our cart will have to use the 3 actions that we have identified above (action space) and he will need an “observation space” that will define the current state of the environment.
By performing a step we can retrieve a few information from the environment :
import gym #initialize environment env = gym.make("MountainCar-v0") env.reset() #do an action in a loop done = False while not done: #make the cart go right action = 2 new_state, reward, done, _ = env.step(action) print(reward, new_state)
The output is :
-1.0 [-0.55547931 0.00124806] -1.0 [-0.55299252 0.0024868 ] -1.0 [-0.54928555 0.00370696] -1.0 [-0.54438613 0.00489942] -1.0 [-0.5383309 0.00605523] -1.0 [-0.53116522 0.00716568] -1.0 [-0.5229428 0.00822243] -1.0 [-0.51372529 0.00921751] -1.0 [-0.50358183 0.01014347] -1.0 [-0.4925884 0.01099343] -1.0 [-0.48082721 0.01176119] -1.0 [-0.46838593 0.01244128] -1.0 [-0.45535686 0.01302907] -1.0 [-0.44183603 0.01352083]
The first number is the reward which does not change at the moment.
The second element is the new_state, the first value is the position on the X axis (horizontal) and the second is the velocity. This 2 values are what we can get to give the state of the environment, those are the 2 observations elements of our space.
Now we will use a Qtable to “model” a Q value for each action for each state. So for each combination of position X and velocity we will have 3 different values possible (left, nothing and right) and a Q value associated to those.
By printing the maximum and minimum values of the X position and the velocity we can have an idea about the range of values for our observation space :
print(env.observation_space.high) print(env.observation_space.low) #Output : #[0.6 0.07] #[-1.2 -0.07]
Now, if we use the 8 decimals for each combination between 0.6 and -1.2 for X axis and 0.07 -> -0.07 for velocity we will have quite a lot of combinations in our Q table, way too much to be honest. We don’t need to have so much values so we will rather group the values into discrete values using buckets, let’s go with 30 buckets.
By using the script below we will determine how much do we need to increment our values to create the 30 buckets :
DISCRETE_OS_SIZE = [30, 30] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE print(discrete_os_win_size) #[0.06 0.00466667]
so we will have to increment each record by 0.06 for X axis and 0.00466667 for velocity. Alright.
Let’s now build our Q table :
import gym import numpy as np #initialize environment env = gym.make("MountainCar-v0") env.reset() DISCRETE_OS_SIZE = [30, 30] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE print(discrete_os_win_size) q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) print(q_table.shape) #do an action in a loop done = False while not done: #make the cart go right action = 2 new_state, reward, done, _ = env.step(action) #print(reward, new_state)
This will build our (30, 30, 3) table, so the 30 * 30 is every combination of our 30 slices of all possible states for our cart (position X and velocity) and the *3 is our 3 possible actions with a value of Q attached. We have randomized the value of Q so far between -2 and 0. Each step is a -1 reward and the flag is a 0 so it makes sense to initialize all random Q values as negative.
This table will be our guideline for the cart, for every state our cart will chose the best action to take based on the max Q value associated with one of the actions. But how to explore and how to populate the table with real Q values ? We need to put randomness and explore different actions and update the Q value along the way ! How ? with this formula :
And this is the formula with less math :
new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
We need now to update the Q-Values (value per possible action per unique state).
First let’s discuss about the discount : the discount measures how much we want to care about FUTURE reward rather than immediate reward. In general this value will be fairly high and between 0 and 1. It is desirable to have it high because the principle of Q learning is to learn a chain of events that ends with the desirable outcome (to reach the flag in this example). So it is pretty natural to put greater importance on long term than rather short term gains.
The max_future_q
is calculated after we’ve performed our action (or “step”), then we update our previous values based partially on the next-step’s best Q value. With time and once we’ve reached the objective at least once, this “reward” value gets slowly back-propagated (hello neural networks!), one step at a time, per episode and the more we go the more “optimal” will be the chain of actions attempted by our Agent.
We will also add a variable with the episodes, how many iterations of a game we want to run.
We will also add a small function in order to convert the state of the environment to our discrete buckets :
def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int))
Next we will also replace our action. We had put action = 2 in order to always go right, but now we want to actually chose the best action for a given state and then grab the new discrete state
action = np.argmax(q_table[discrete_state]) new_discrete_state = get_discrete_state(new_state)
And finally we also want to update the Q-value :
# If simulation did not end yet after last step - update Q table if not done: # Maximum possible Q value in next step (for new state) max_future_q = np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q = q_table[discrete_state + (action,)] # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state + (action,)] = new_q
Full code of our Agent until this stage :
import gym import numpy as np env = gym.make("MountainCar-v0") LEARNING_RATE = 0.1 DISCOUNT = 0.95 EPISODES = 25000 DISCRETE_OS_SIZE = [30, 30] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) # we use this tuple to look up the 3 Q values for the available actions in the q-table discrete_state = get_discrete_state(env.reset()) done = False while not done: action = np.argmax(q_table[discrete_state]) new_state, reward, done, _ = env.step(action) new_discrete_state = get_discrete_state(new_state) env.render() #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # If simulation did not end yet after last step - update Q table if not done: # Maximum possible Q value in next step (for new state) max_future_q = np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q = q_table[discrete_state + (action,)] # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state + (action,)] = new_q # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #q_table[discrete_state + (action,)] = reward q_table[discrete_state + (action,)] = 0 discrete_state = new_discrete_state env.close()
We then add a loop to go over the number of episodes that we specified in our EPISODES variable and we let our little script go over thousands of iteration of games. To display only some of the examples we will add a new variable that will pilot the interval between two displayed games.
Here is the loop :
for episode in range(EPISODES): discrete_state = get_discrete_state(env.reset()) done = False if episode % SHOW_EVERY == 0: render = True print(episode) else: render = False while not done: action = np.argmax(q_table[discrete_state]) new_state, reward, done, _ = env.step(action) new_discrete_state = get_discrete_state(new_state) if episode % SHOW_EVERY == 0: env.render()
The problem is.. That at that moment our agent is not learning anything. The reason is that we have initialized randomly the table and the agent is simply choosing the biggest Q value and as such being “greedy” and not learning anything, those Q values are random and as such completely useless. That’s why we need some randomness in the movements of our agent in order to learn from those random moves, this is the only way to make it explore new paths and actually capitalize on those.
To this end we will use the epsilon value :
# Exploration settings epsilon = 1 # not a constant, qoing to be decayed START_EPSILON_DECAYING = 1 END_EPSILON_DECAYING = EPISODES//2 epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)
We also want to decay the epsilon with time so that the moves are less and less random :
... # Decaying is being done every episode if episode number is within decaying range if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING: epsilon -= epsilon_decay_value env.close()
And finally we use epsilon in our actions :
while not done: if np.random.random() > epsilon: # Get action from Q table action = np.argmax(q_table[discrete_state]) else: # Get random action action = np.random.randint(0, env.action_space.n)
So we pick randomly a value between 0 and 1 and we check if it is bigger than epsilon. If yes we pick the next action based on the Q table and the discrete state that we are in. If not we pick a random action in the action space (0,1 or 2).
After each iteration if we are after the epsilon decaying start episode we will make it smaller and smaller and thus making our moves less and less random. let’s also add a small message when we actually reach the flag :
Here is the full code and let’s try it out :
import gym import numpy as np #Setting up the environment env=gym.make("MountainCar-v0") #All our variables and constants #Model LEARNING_RATE=0.1 DISCOUNT=0.95 #Number of episodes for our agent to go through EPISODES=25000 #Show an example on the screen every 2000 episodes SHOW_EVERY = 2000 #Our discrete bucketing of the observatrion space DISCRETE_OS_SIZE=[30]*len(env.observation_space.high) #Our increment for the bucketing discrete_os_win_size=(env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE #The epsilon to pilot the randomness of our actions to increase exploration to actually enable learning epsilon = 1 #When do we start to decay epsilon in order to reduce randomness of the actions of our model START_EPSILON_DECAYING = 1 #End of the decaying of our epsilon END_EPSILON_DECAYING = EPISODES // 2 #Value of the devaying of our epsilon, by how much do we decrease epislon after each iteration epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING) #Creation of our Q_table that will store all the different values of our environment space q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) #Quick helper-function that will convert our environment "state," which currently contains continuous #values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead: def get_discrete_state(state): discrete_state = (state-env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) #Looping code for each episode for episode in range(EPISODES): #We print the episode number if we will show it as an example if episode%SHOW_EVERY ==0: print(episode) #Show it render=True else: #not show it render=False #get the discrete_state discrete_state=get_discrete_state(env.reset()) done=False while not done: #Selection of the action for our Cart #Now we just need to use epsilon. We'll use np.random.random() to randomly pick a number 0 to 1. #If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly: if np.random.random() > epsilon: action = np.argmax(q_table[discrete_state]) else: action = np.random.randint(0, env.action_space.n) #We perform the step based on the action we determined in the previous lines of code and we collect the result new_state, reward, done, _ = env.step(action) #We have a new discrete state now new_discrete_state=get_discrete_state(new_state) #If we show the example : if render: env.render() # If simulation did not end yet after last step - we update Q table if not done: #The max_future_q is grabbed after we've performed our action already, and then we update our previous values based #partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets #slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works! # Maximum possible Q value in next step (for new state) max_future_q=np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q=q_table[discrete_state+(action, )] #THE FORMULA FOR ALL Q VALUES (we update the Q wqith the new value) # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state+(action,)]=new_q # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #we display message if we arrived to the flag print(f"Flag reached on episode : {episode}") q_table[discrete_state + (action,)] = 0 discrete_state=new_discrete_state # Decaying is being done every episode if episode number is within decaying range if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING: epsilon -= epsilon_decay_value #Once all episodes performed we close the environment env.close()
Here is how it goes :
As we can see our little cart reaches the flag and as time passes it does it better and better which is great ! It took the cart more than 5800episodes to get there one time, by the episode 7000 it gets there pretty much every 2 or 3 tries. Later we get into the episode better are our Qvalues and less random is the choice of our agent making it more and more reliable. And this is what QLearning is about !
And here is a little video about it :
Next step would be to eventually display some metrics during the training.
So we add a new dictionary with a couple of additional metrics :
ep_rewards = [] aggr_ep_rewards = {'ep':[], 'avg' : [], 'min' : [], 'max' : []}
We then add in our loop the code to calculate the reward of our current episode and then we append it to the list of episode rewards. Once done we can display the metrics every X episodes using our average_reward dict.
episode_reward += reward ep_rewards.append(episode_reward) if not episode%SHOW_EVERY: average_reward= sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:]) aggr_ep_rewards['ep'].append(episode) aggr_ep_rewards['avg'].append(average_reward) aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:])) aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:])) print(f"Episode:{episode} avg: {average_reward} min: {min(ep_rewards[-SHOW_EVERY:])} max: {max(ep_rewards[-SHOW_EVERY:])}")
Finally, when we are all done with the training we can plot the average, the max and the min on every episode and get some nice indicators about how the training is going on :
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards") plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="max rewards") plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="min rewards") plt.legend(loc=4) plt.show()
We also take out our “Flag reached” message in order to not spam the output.
We indeed have our metrics displayed during the training and when done :
Beautiful, our cart is learning and reaching the flag more and more as proven by the reward getting bigger and bigger !
The video :
Full code with the ploting as well :
import gym import numpy as np import matplotlib.pyplot as plt #Setting up the environment env=gym.make("MountainCar-v0") #All our variables and constants #Model LEARNING_RATE=0.1 DISCOUNT=0.95 #Number of episodes for our agent to go through EPISODES=10000 #Show an example on the screen every 2000 episodes SHOW_EVERY = 500 #Our discrete bucketing of the observatrion space DISCRETE_OS_SIZE=[30]*len(env.observation_space.high) #Our increment for the bucketing discrete_os_win_size=(env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE #The epsilon to pilot the randomness of our actions to increase exploration to actually enable learning epsilon = 1 #When do we start to decay epsilon in order to reduce randomness of the actions of our model START_EPSILON_DECAYING = 1 #End of the decaying of our epsilon END_EPSILON_DECAYING = EPISODES // 2 #Value of the devaying of our epsilon, by how much do we decrease epislon after each iteration epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING) #Creation of our Q_table that will store all the different values of our environment space q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) #Additional objects for reporting purposes ep_rewards = [] aggr_ep_rewards = {'ep':[], 'avg' : [], 'min' : [], 'max' : []} #Quick helper-function that will convert our environment "state," which currently contains continuous #values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead: def get_discrete_state(state): discrete_state = (state-env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) #Looping code for each episode for episode in range(EPISODES): #Initialisation of the reward for current episode episode_reward=0 #We print the episode number if we will show it as an example if episode%SHOW_EVERY ==0: #Show it print(episode) render=True else: #not show it render=False #get the discrete_state discrete_state=get_discrete_state(env.reset()) done=False while not done: #Selection of the action for our Cart #Now we just need to use epsilon. We'll use np.random.random() to randomly pick a number 0 to 1. #If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly: if np.random.random() > epsilon: action = np.argmax(q_table[discrete_state]) else: action = np.random.randint(0, env.action_space.n) #We perform the step based on the action we determined in the previous lines of code and we collect the result new_state, reward, done, _ = env.step(action) #We increment the reward for this episode episode_reward += reward #We have a new discrete state now new_discrete_state=get_discrete_state(new_state) #If we show the example : if render: env.render() # If simulation did not end yet after last step - update Q table if not done: #The max_future_q is grabbed after we've performed our action already, and then we update our previous values based #partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets #slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works! # Maximum possible Q value in next step (for new state) max_future_q=np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q=q_table[discrete_state+(action, )] #THE FORMULA FOR ALL Q VALUES (we update the Q wqith the new value) # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state+(action,)]=new_q # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #We took off the print #print(f"Flag reached on episode : {episode}") q_table[discrete_state + (action,)] = 0 discrete_state=new_discrete_state # Decaying is being done every episode if episode number is within decaying range if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING: epsilon -= epsilon_decay_value #We append the reward of this episode to the list of rewards/episode ep_rewards.append(episode_reward) #If we show the metrics if not episode%SHOW_EVERY: #We calculate the average reward so far average_reward= sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:]) #We append the metrics to our display object aggr_ep_rewards['ep'].append(episode) aggr_ep_rewards['avg'].append(average_reward) aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:])) aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:])) #We print it print(f"Episode:{episode} avg: {average_reward} min: {min(ep_rewards[-SHOW_EVERY:])} max: {max(ep_rewards[-SHOW_EVERY:])}") #We close it once we finished to run all episodes env.close() #We plot our final metrics ! plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards") plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="max rewards") plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="min rewards") plt.legend(loc=4) plt.show()