As seen in our previous article, our agent is able to imitate an autopilot, at least a little bit. Driving around in auto pilot sure is fun, but what would be the next step to enhance our agent ? Let’s try to make him learn via Reinforcement learning this time.

What is Reinforcement learning ?

We can describe reinforcement learning as: Teaching agents to take actions in an environment in order to maximize the notion of cumulative reward. Reinforcement learning does not need any labelled input/output data to be passed to the model, the main challenge is rather to find the right balance between exploration (often based on random actions) and exploitation (the current knowledge that the agent has developed).

I initially explored the idea to use Q Learning as we did in the mountain cart article. Q Learning is in essence a light weight approach to reinforcement learning. It is based on the maintenance of a Q Table in which we describe all the different possible states of our environment and we associate to each of those states all the different actions that our agent can take and then populate for all those combinations of “states” and “actions” a Q-value that describes for each state-action couple the action that seems the most optimal considering the potential reward. The problem is that for our challenge, the number of different states is…phenomenal! In a first step we can describe the current state of our environment with the RGB values of all the pixels captured by the RGB camera sensor (and could be expended to other sensors). It is clear that considering all the values and all the combination that those can represent the table would just be too big to be handled.

The second option would be a Deep Q Learning model. In this situation we ditch the Q-table and implement a deep neural network in its place. We use this neural network to approximate the Q-value function. The state is dynamically given as an input and the Q-value of all possible actions is generated as output. The past experiences are stored in the memory of the model and the next action is determined by the maximum Q-value in output.

Here are below the different steps involved in a Deep Q Learning model :

  1. Gather and pre-process the image feed from Carla SIM (the RGB Camera) and feed it to the DQN, the DQN returns the Q-values of all the different possible actions (steer, throttle, brake)
  2. Pick an action, either by taking the highest Q-value or randomly to promote exploration using a parameter epsilon
  3. The Agent performs the action (our car would either turn or accelerate or brake) and transition from a current state to a new state to receive a reward. The new state is the pre-processed image of the next frame captured by the RGB camera. All this information is called the “transition” (current state, action, reward, new state).
  4. Pick randomly sample batches of transitions from the memory (or buffer) and calculate the loss
  5. The loss is calculated as the squared different between target Q and predicted Q
  6. Perform the gradient descent to minimalize the loss
  7. Every X steps or episodes, update your target model with the new weights
  8. Rinse and Repeat

This seems to be a plausible solution for our problem.

In the context of Carla

Spoiler alert : I did not achieve a functional Reinforcement learning agent yet, I am still working on it, so if you are looking for a fully functional agent I recommend that you look somewhere else. I am just going to explain the concept and share some bits of code here and there.

Alright so first, here is our class to spawn the agent, attach a RGB camera to it and to a “step” :

class CarEnv: 
    SHOW_CAM = SHOW_PREVIEW
    STEER_AMT = 1.0
    im_width = IM_WIDTH
    im_height = IM_HEIGHT
    front_camera = None 

    def __init__(self):
        #Env initialization
        self.client = carla.Client("localhost",2000)
        self.client.set_timeout(2.0)
        self.world = self.client.get_world()
        self.blueprint_library = self.world.get_blueprint_library()
        self.model_3 = self.blueprint_library.filter("model3")[0]

    def reset(self):
        #Agent initialisation
        self.collision_hist = []
        self.actor_list = []
       
        self.transform = random.choice(self.world.get_map().get_spawn_points())
        self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
        self.actor_list.append(self.vehicle)
        
        self.rgb_cam = self.blueprint_library.find("sensor.camera.rgb")
        self.rgb_cam.set_attribute("image_size_x",f"{self.im_width}")
        self.rgb_cam.set_attribute("image_size_y",f"{self.im_height}")
        self.rgb_cam.set_attribute("fov",f"110")
        
        transform=carla.Transform(carla.Location(x=2.5,z=0.7))
        self.sensor= self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
        self.actor_list.append(self.sensor)
        self.sensor.listen(lambda data: process_img(data))

        #Just to make it start recording, apparently passing an empty command makes it react
        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0,brake=0.0))
        time.sleep(4)

        colsensor=self.blueprint_library.find("sensor.other.collision")
        self.colsensor=self.world.spawn_actor(colsensor,transform, attach_to=self.vehicle)
        self.actor_list.append(self.colsensor)
        self.colsensor.listen(lambda event: self.collision_date(event))

        while self.front_camera is None: 
            time.sleep(0.01)

        #Everything is set, we can start the episode

        self.episode_start = time.time()
        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0,brake=0.0))

        return self.front_camera
    #Is there a collision ?
    def collision_data(self,event):
        self.collision_hist.append(event)

    #Function to flatten the image retrieved by the sensor
    def process_img(image):
        i = np.array(image.raw_data)
        #RGBA = 4 (get the alpha)
        #reshape the image to get a picture
        i2= i.reshape((self.im_height,self.im_width, 4))
        #take everything but only the RGB not the Alpha
        i3=i2[:,:,:3]
        if self.SHOW_CAM:
            #Show the image
            cv2.imshow("",i3)
            cv2.waitKey(1)
        #Normalize the data
        self.front_camera=i3

    #Go left, straight, right
    def step(self,action):
        if action==0:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
        elif action==1:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0))
        elif action ==2:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))

        v=self.vehicle.get_velocity()
        #Convert velocity into km/h

        kmh=int(3.6*math.sqrt(v.x**2+v.y**2+v.z**2))

        #if there has been a collision
        if len(self.collision_hist) !=0:
            #We stop
            done=True
            #Penalty
            reward=-200
        elif kmh < 50:
            done = False
            #Small penalty if we go > 50
            reward = -1
        else:
            done=False
            reward=1

        if self.episode_start + SECONDS_PER_EPISODE < time.time():
            done=True

        return self.front_camera, reward, done, None

So, the main thing to discuss here is the step function as the rest was already explained/seen in the imitation learning tutorial. We pass to the step function the agent and an action. In our case we are no longer using the throttle, steering and brake as actions we are using left/right/straight and take as an assumption that those are the only actions we want our agent to take into consideration while driving. If there is a collision we stop and our agent will be destroyed with a big malus to the reward, if we go under 50kmh, we will also have a malus. And finally we return the camera, the reward result, the done variable.

Then I tried to build a model and an agent. The concept would be that at each point in time we have an environment state based on what is streamed from the RGB Camera, at every frame this environment state changes and by taking into account the value of each pixel our agent has to take a decision in the action Space (go left, right or nothing). The goal of the algorithm is to choose the right action for the given input by selecting the highest Q value among the 3 different actions for a given state. The first iterations of our agent should be random in order to promote exploration and then slowly take out the randomness. The concept is quite easy to grasp, but the implementation is rather difficult for a beginner as myself. Even guided by material we found online, (this article is based on the tutorial by sentdex) I struggled to create the model. We reused the Xception model from the imitation learning article.

The model takes as input our RGB camera data and spit in output the predictions that will be used to drive the car. With 3 different outputs (action space).

class DQNAgent:
    def __init__(self):

        self.model = self.create_model()
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights()) #one model is trained, the other is kept same for x episodes to predict against and then update every x episodes
        #As we train, we train from randomly selected data from our replay memory:
        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE) #Memory of previous actions
        #Reporting of the metrics
        #For the same reasons as before (the RL tutorial), we will be modifying TensorBoard: -> we do not want it to create a log at each episode
        self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
        self.target_update_counter = 0 # will track when it's time to update the target model
        self.graph = tf.compat.v1.get_default_graph()
        self.terminate = False # Should we quit?
        self.last_logged_episode = 0
        self.training_initialized = False # waiting for TF to get rolling

    def create_model(self):
        base_model= Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))
        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        #3 actions, 3 predictions left, right, straight
        predictions = Dense(3, activation="linear")(x)
        model = Model(inputs = base_model.input, outputs = predictions)
        model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"],run_eagerly=False)
        return model

    def update_replay_memory(self, transition):
        # transition = (current_state, action, reward, new_state, done)
        #We need a quick method in our DQNAgent for updating replay memory:
        self.replay_memory.append(transition)

    # Trains main network every step during episode
    def train(self):
        #We need enough memory to actually train and not just do random stuff over and over again with high epsilon
        #To begin, we only want to train if we have a bare minimum of samples in replay memory:
        # Start training only if certain number of samples is already saved
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

        #If we don't have enough samples, we'll just return and be done. If we do, then we will begin our training. First, we need to grab a random minibatch:
        # Get a minibatch of random samples from memory replay table
        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

        #Once we have our minibatch, we want to grab our current and future q values
        # Get current states from minibatch, then query NN model for Q values
        current_states = np.array([transition[0] for transition in minibatch])/255 

        with self.graph.as_default():
            current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)
        # Get future states from minibatch, then query NN model for Q values
        # When using target network, query it, otherwise main network should be queried
        new_current_states = np.array([transition[3] for transition in minibatch])/255 
        with self.graph.as_default():
            future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)

        y = []
        X = []

        # Now we need to enumerate our batches
        for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
            # If not a terminal state, get new q from future states, otherwise set it to 0
            # almost like with Q Learning, but we use just part of equation here
            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward 

            # Update Q value for given state
            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            # And append to our training data
            X.append(current_state)
            y.append(current_qs)

        log_this_step = False

        if self.tensorboard.step > self.last_logged_episode:
            log_this_step = True
            self.last_log_episode = self.tensorboard.step

        # Fit on all samples as one batch, log only on terminal state
        with self.graph.as_default():
            self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)

        #Next, we want to continue tracking for logging:
        if log_this_step:
            self.target_update_counter +=1

        #Finally, we'll check to see if it's time to update our target_model:
        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.tarfet_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0

    # Queries main network for Q values given current observation space (environment state)
    def get_qs(self, state):
        #First, we need a method to get q values (basically to make a prediction)


        return self.model.predict(np.array(state).reshape(-1, * state.shape)/255)[0]

    def train_in_loop(self):
        #Finally, we just need to actually do training:
        X = np.random.uniform(size = (1, IM_HEIGHT, IM_WIDTH,3)).astype(np.float32)
        y = np.random.uniform(size=(1,3)).astype(np.float32)

        with self.graph.as_default():
            self.model.fit(X,y,verbose=False,batch_size=1)

        self.training_initialized = True
        #To start, we use some random data like above to initialize, then we begin our infinite loop:
        while True:
            if self.terminate:
                return

            self.train()
            time.sleep(0.01)

At every step that the agent performs, we want to update Q values, but we also want to predict from our model. Initially the goal is to enable our agent to do random actions in order to explore the different situations in his environment and we reward him when he performs well and we penalize him if it does not well, in our implementation we penalize him if there is a collision for instance. This is standard practice in Q and Reinforcement learning as this is the only way for an agent to explore its environment and try new things to see if they work. Without randomness the agent would just stick to the highest Q value for a given state and not learn anything. The randomness is piloted by an epsilon and a decay parameter that will reduce the impact of epsilon (randomness) along the progress of the episodes. The more our agent trains the less random his actions get.

We have therefore a lot fluctuation, which can be very confusing for a model. One way to solve that is to use memory replay, where we introduce the concept of having actually two models. A main model that is constantly evolving while training and another target network that is updated every N steps or episode and that is being used to predict against every frame captured by our agent.

To train the model we grab samples randomly and bundle them into a batch and then start training, we do nothing if we do not have enough samples. Once the batch is created, we pick our current and future q values. In DQN the transition is defined as : (current state, action, reward, new state) as mentioned in the previous section. From this we create our inputs X and outputs y for our deep Q network. We then train the model with a model.fit. Notice the normalization of the X input. We then check if we should update the target_model that we are supposed to updated every n iterations, and that we use to .predict.

Then we train in a loop. Before starting to train with actual real values we populate first the values with random inputs and then enter the infinite loop where the train function called and that will use the real values captured by the Camera.

In practice, while the concept is understandable, I never achieved a fully operational algorithm that implements Deep Q learning for the Agent. All the pieces are figured out and laying in front of us like Legos in a box, all we have to do was to pick them and actually combine everything into a single solution.

The environment

  • Initialization (world, agent, sensors)
  • Collision data
  • Frames collection and processing
  • Step function

The DQN Agent

  • Initialization
  • Creation of the model
  • Replay memory
  • Training
  • Q values retrieval (predictions)

And finally “the glue” to make it work with threading, creation of multiple concurrent agents, running the training and reporting the metrics. In the end, I was able to run the training but it was so demanding from a computing point of view that I could not do it on my personal machine and therefore kind off put this subject on hold right there.

Next steps

I might revisit this subject in the future and deliver something which is fully configured. But the most interesting part was to actually understand the concept of the two models, the one being trained and the one being used to predict, and the concept of the reward. We will come back to reinforcement learning as this is simply awesome and I would really enjoy to develop a functional Algorithm in the context of a game. The Halite challenge seems to be quite interesting, or the game of GO. In the mean time I really recommend the following content :


Brax

Dude in his 30s starting his digital notepad