[Project#] 시리즈는 프로젝트(Manipulator Continuous Control with Deep Reinforcement Learning) 제작 과정을 설명한다. 본 포스팅은 DDPG Agent 구현 과정에 대한 내용이다.

모든 내용은 이전 글과 이어진다. 이전 글을 읽어봤다고 가정한 후 글을 작성하였다. (이전 글 LINK)

1. DDPG_manipulator.py 생성

1.1. Actor 클래스

class Actor(nn.Module):
    def __init__(
        self, 
        state_size, 
        hidden_size, 
        action_size,
        device
        ) -> None:
        
        super(Actor, self).__init__()
        
        self.device = device
        
        self.fc_1 = nn.Linear(in_features=state_size, out_features=hidden_size)
        self.fc_2 = nn.Linear(in_features=hidden_size, out_features=int(hidden_size / 2))
        self.fc_3 = nn.Linear(in_features=int(hidden_size / 2), out_features=action_size)
        
        for m in self.modules():
            if isinstance(m, nn.Linear):
                torch.nn.init.xavier_normal_(m.weight.data)
                m.bias.data.fill_(0.01)
        
    def forward(self, state):
        x = F.relu(self.fc_1(state))
        x = F.relu(self.fc_2(x))
        action = F.tanh(self.fc_3(x))
        return action
  • 행동자(Actor)는 상태(state)를 입력으로 받아 행동(action)을 반환한다.

1.2. Critic 클래스

class Critic(nn.Module):
    def __init__(
        self, 
        state_size,
        hidden_size, 
        action_size,
        device
        ) -> None:
        
        super(Critic, self).__init__()
        
        self.device = device
        
        self.state_fc_1 = nn.Linear(state_size, hidden_size)
        self.state_fc_2 = nn.Linear(hidden_size, int(hidden_size / 2))
        self.action_fc_1 = nn.Linear(action_size, int(hidden_size / 2))
        self.add_fc_1 = nn.Linear(hidden_size, 1)
        
        for m in self.modules():
            if isinstance(m, nn.Linear):
                torch.nn.init.xavier_normal_(m.weight.data)
                m.bias.data.fill_(0.01)
        
    def forward(self, state, action):
        x_s = F.relu(self.state_fc_1(state))
        x_s = F.relu(self.state_fc_2(x_s))
        
        x_a = F.relu(self.action_fc_1(action))
        
        x = torch.cat((x_s, x_a), dim=1)
        x = F.tanh(self.add_fc_1(x))
        return x
  • 비평가(Critic)는 상태(state)와 행동(action), 둘 다 입력으로 받으며, Q_value를 반환한다.
  • Critic을 비평가로 번역하는 것이 옳은지 모르겠다. 우선, 현재 상태(state)를 평가한다는 의미에서 비평가라는 단어를 선택하게 되었다.

1.3. OU_Noise 클래스

class OU_Noise():
    def __init__(self, action_dim, mu=0.0, theta=0.15, max_sigma=0.1, min_sigma=0.1, decay_period=100000):
        self.mu = mu
        self.theta = theta
        self.sigma = max_sigma
        self.max_sigma = max_sigma
        self.min_sigma = min_sigma
        self.decay_period = decay_period
        self.action_dim = action_dim
        self.reset()

    def reset(self):
        self.state = np.ones(self.action_dim) * self.mu

    def evolve_state(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(self.action_dim)
        self.state = x + dx
        return self.state

    def get_noise(self, t=0):
        ou_state = self.evolve_state()
        decaying = float(float(t) / self.decay_period)
        self.sigma = max(self.sigma - (self.max_sigma - self.min_sigma) * min(1.0, decaying), self.min_sigma)
        return ou_state
  • 관성이 있는 물리적 제어 문제에서 탐색 효율성을 위한 시간적 상관 탐색을 생성하기 위해 Ornstein-Uhlenbeck 프로세스를 사용했다고 한다.
  • 그러나 OU Noise를 사용한 명확한 이유를 아직 모르겠다.

1.4. Replay_Buffer 클래스

class Replay_Buffer():
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)

    def push(self, state, action, reward, next_state):
        experience = (state, action, np.array([reward]), next_state)
        self.buffer.append(experience)

    def sample(self, batch_size):
        state_batch = []
        action_batch = []
        reward_batch = []
        next_state_batch = []

        batch = random.sample(self.buffer, batch_size)

        for experience in batch:
            state, action, reward, next_state = experience
            state_batch.append(state)
            action_batch.append(action)
            reward_batch.append(reward)
            next_state_batch.append(next_state)

        return state_batch, action_batch, reward_batch, next_state_batch

    def __len__(self):
        return len(self.buffer)
  • 상태(state), 행동(acition), 보상(reward), 다음 상태(next_state)를 계속해서 저장할 수 있는 버퍼 클래스를 생성하였다.
  • replay_buffer의 가장 큰 장점은 i.i.d 형성이라고 생각한다. 또한, 너무 과거의 상태를 학습하지 않기 때문에 잘못된 방향으로 학습하는 경향을 감소시킬 수 있는 것이라 생각한다.

1.5. Agent 클래스

class Agent():
    def __init__(
        self,
        hidden_size=256, 
        actor_learning_rate=1e-4, 
        critic_learning_rate=1e-4,
        gamma=0.99, 
        tau=0.9999,
        tau_actor=9e-4,
        tau_critic=1e-4,
        max_memory_size=50000,
        ou_noise=False,
        n_learn=16,
        device='cpu'
        ):
        
        self.state_size = 10 
        self.action_size = 6 

        self.gamma = gamma
        self.tau = tau
        self.tau_actor = tau_actor
        self.tau_critic = tau_critic
        self.t_step = 0 
        self.ou_noise = ou_noise
        self.n_learn = n_learn
        self.device = device
        
        if self.device == 'cuda':
            self.gamma = torch.FloatTensor([gamma]).to(self.device)
            self.tau = torch.FloatTensor([tau]).to(self.device)
            self.tau_actor = torch.FloatTensor([tau_actor]).to(self.device)
            self.tau_critic = torch.FloatTensor([tau_critic]).to(self.device)
        
        print(f"[device: {self.device}]")
        
        # Initialization of the networks
        self.actor  = Actor(self.state_size, hidden_size, self.action_size, self.device)   # main network Actor
        self.critic = Critic(self.state_size, hidden_size, self.action_size, self.device)  # main network Critic

        self.actor_target = Actor(self.state_size, hidden_size, self.action_size, self.device)
        self.critic_target = Critic(self.state_size, hidden_size, self.action_size, self.device)

        self.actor = self.actor.to(self.device)
        self.critic = self.critic.to(self.device)
        self.actor_target = self.actor_target.to(self.device)
        self.critic_target = self.critic_target.to(self.device)
        
        # print("actor:", self.actor.is_cuda())
        # print("critic:", self.critic.is_cuda())
        # print("actor_target:", self.actor_target.is_cuda())
        # print("critic_target:", self.critic_target.is_cuda())
        
        # Initialization of the target networks as copies of the original networks
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(param.data)

        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(param.data)

        # Initialization memory
        self.memory = Replay_Buffer(max_memory_size)

        self.actor_optimizer  = optim.AdamW(self.actor.parameters(), lr=actor_learning_rate)
        self.critic_optimizer = optim.AdamW(self.critic.parameters(), lr=critic_learning_rate)
        
        if self.ou_noise is True:
            self.noise = OU_Noise(action_dim=self.action_size)

    def load_weights(self, actor_path=None, critic_path=None):
        if actor_path is None or critic_path is None: return
        
        self.actor.load_state_dict(torch.load(actor_path))
        self.critic.load_state_dict(torch.load(critic_path))
    
    def get_action(self, state):
        state_change = torch.tensor(state).float().to(self.device)
        print(f"{state_change[1].cpu().numpy():.10}", state_change.dtype)
        # print(state_change, state_change.dtype)
        
        action = self.actor.forward(state_change)
        
        if self.ou_noise is True:
            noise = torch.from_numpy(copy.deepcopy(self.noise.get_noise(self.t_step))).to(self.device)
            action = torch.clamp(torch.add(action, noise), -3.14159, 3.14159).to(self.device)
            
        action = action.detach().cpu().numpy()
        
        float_action = []
        for float64_action in action:
            float_action.append(float64_action.item())
        
        return float_action
    
    def step_training(self, batch_size):
        self.t_step = self.t_step + 1
        
        if self.memory.__len__() > batch_size:
            for _ in range(self.n_learn):
                self.learn_step(batch_size)
            
            if self.device == 'cuda':
                return self.actor_loss.detach().cpu().numpy(), self.critic_loss.detach().cpu().numpy()
            else:
                return self.actor_loss.detach().numpy(), self.critic_loss.detach().numpy()
        
        return None, None
    
    def learn_step(self, batch_size):
        state, action, reward, next_state = self.memory.sample(batch_size)
        
        state  = np.array(state)
        action = np.array(action)
        reward = np.array(reward)
        next_state = np.array(next_state)
        
        state  = torch.FloatTensor(state).to(self.device)
        action = torch.FloatTensor(action).to(self.device)
        reward = torch.FloatTensor(reward).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        
        Q_value = self.critic.forward(state, action)
        next_action = self.actor_target.forward(next_state)
        next_Q_value = self.critic_target.forward(next_state, next_action)
        target_Q_value = reward + self.gamma * next_Q_value
        
        loss = nn.MSELoss()
        self.critic_loss = loss(Q_value, target_Q_value)
        self.actor_loss = - self.critic.forward(state, self.actor.forward(state)).mean()
        
        self.actor.zero_grad()
        self.actor_loss.backward()
        self.actor_optimizer.step()
        
        self.critic.zero_grad()
        self.critic_loss.backward()
        self.critic_optimizer.step()
        
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(param.data * self.tau_actor + target_param.data * (1.0 - self.tau))

        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(param.data * self.tau_critic + target_param.data * (1.0 - self.tau))

    def save_model(self, save_path, file_name=None):
        if file_name is not None:
            torch.save(self.actor.state_dict(), f"{save_path}/actor_{file_name}.pt")
            torch.save(self.critic.state_dict(), f"{save_path}/critic_{file_name}.pt")
        else:
            torch.save(self.actor.state_dict(), f"{save_path}/actor.pt")
            torch.save(self.critic.state_dict(), f"{save_path}/critic.pt")

2. main_rl_environment.py 수정

2.1. get_space(state_space_funct) 함수 수정

def get_space(self, step=0):
		try:
			reward  = self.calculate_reward_funct(step)
			
			state = [
					self.goal_0,
					self.joint_1_pos, self.joint_2_pos, self.joint_3_pos, self.joint_4_pos, self.joint_5_pos, self.joint_6_pos,
					self.robot_x, self.robot_y, self.robot_z
					]			 	
		except:
			self.get_logger().info('-------node not ready yet, Still getting values------------------')
			return 
		else:
			return state, reward
  • 상태(state)는 현재 골 정보(True/False), 각 관절의 각도, end_point의 좌표를 포함한다.

2.2. calculate_reward_funct 함수 수정

def calculate_reward_funct(self, step=0):
		try:
			robot_end_position    = np.array((self.robot_x, self.robot_y, self.robot_z))
			target_point_position_0 = np.array((self.pos_sphere_0_x, self.pos_sphere_0_y, self.pos_sphere_0_z))
			
			reward_0 = 0
   
		except: 
			self.get_logger().info('could not calculate the distance yet, trying again...')
			return

		else:
			distance_0 = np.linalg.norm(robot_end_position - target_point_position_0)
   		
			reward_d = - 0.1 * (distance_0**np.e)
			if reward_d < -5: reward_d = -5

			if distance_0 < 0.05 and step > 3:
				self.get_logger().info('Goal Reached')
				self.goal_0 = True
			else:
				self.goal_0 = False

			if self.goal_0 is True: reward_0 = 5
   
			return reward_d + reward_0

Untitled

  • 총 보상은 2가지 보상의 합으로 설정하였다.
  • 첫 번째 보상은 목표지점과 end_point의 좌표가 멀수록 음수가 감소하도록 함수를 설정하였다.

    → $-0.1 * x^e$

  • 두 번째 보상은 목표지점에 달성했는지 판단하여 +5 또는 0을 부여하였다.

3. train_ddpg.py 생성

from .DDPG_manipulator import Agent
from .main_rl_environment import MyRLEnvironmentNode

import matplotlib.pyplot as plt
import numpy as np
import time
import datetime
import random
import rclpy

def main(args=None):

    rclpy.init(args=args)
    env = MyRLEnvironmentNode()
    rclpy.spin_once(env)
    
    
    try:
        EPISODE = 25
        EPISODE_STEP = 1000
        batch_size = 256
        save_dir = "/home/dndqodqks/ros2_ws/src/robotic_arm_environment/my_environment_pkg"
        
        agent = Agent(ou_noise=True,
                    critic_learning_rate=0.00001,
                    actor_learning_rate=0.00001, 
                    max_memory_size=1000000,
                    hidden_size=512,
                    device='cuda') # 4:26, 2:56
        
        agent.load_weights(save_dir+"/model/actor_755_h_512_best.pt", save_dir+"/model/critic_755_h_512_best.pt")

        rewards = []
        actor_loss = []
        critic_loss = []
        
        avg_rewards = []
        avg_actor_loss = []
        avg_critic_loss = []
        goal_time_step = []
        
        total_start_time = time.time()
        
        for episode in range(EPISODE):
            episode_start_time = time.time()
            
            env.reset_environment_request()
            agent.noise.reset()
            
            state, _ = env.get_space()
            episode_reward = 0
            episode_actor_loss = np.array([])
            episode_critic_loss = np.array([])
            
            for step in range(EPISODE_STEP):
                print (f'----------------Episode:{episode+1} Step:{step+1}--------------------')
                
                if state[1] < -1.570795 or state[1] > 1.570795:
                    state[1] = random.uniform( -1.570795, 1.570795)
                
                action = agent.get_action(state)
                env.action_step_service(action_values=action, second=0.0001)
                new_state, reward  = env.get_space(step)
                agent.memory.push(state, action, reward, new_state)
                actor_loss, critic_loss = agent.step_training(batch_size)
                
                state = new_state
                episode_reward += reward
                
                if actor_loss is not None and critic_loss is not None:
                    episode_actor_loss += actor_loss
                    episode_critic_loss += critic_loss
                
                goal_time_step.append(0)
                
                if actor_loss is not None and critic_loss is not None:
                    print(f'Episode/step: {episode+1}/{step+1}\treward: {reward:.6f}\tactor_loss: {actor_loss:.6f}\tcritic_loss: {critic_loss:.6f}')
                else:
                    print(f'Episode/step: {episode+1}/{step+1}\treward: {reward:.6f}\tactor_loss: {actor_loss}\tcritic_loss: {critic_loss}')
                
                if new_state[0] == True: 
                    # if done is TRUE means the end-effector reach to goal and environmet will reset
                    print ("----------------Goal Reach----------------")
                    print(f"episode: {episode+1}, step: {step+1} reward: {np.round(episode_reward, decimals=2)}, average _reward: {np.mean(rewards[-10:])}")
                    goal_time_step.append(step+1)
                    agent.save_model(save_dir+"/ckp", f"{episode}_{step}")
                    break
                
                # now_time = time.time()
                # if now_time - episode_start_time > 20:
                #     print("----------------Time Out----------------")
                #     break
            
            rewards.append(episode_reward)
            avg_rewards.append(np.mean(rewards[-10:]))
            
            if actor_loss is not None and critic_loss is not None:
                actor_loss = np.append(actor_loss, episode_actor_loss)
                critic_loss = np.append(critic_loss, episode_critic_loss)
                
                avg_actor_loss = np.append(avg_actor_loss, np.mean(actor_loss[-10:]))
                avg_critic_loss = np.append(avg_critic_loss, np.mean(critic_loss[-10:]))
            
            episode_end_time = time.time()
            episode_sec = episode_end_time - episode_start_time
            episode_time = str(datetime.timedelta(seconds=episode_sec)).split(".")[0]
            
            print(f'Episode {episode+1} Ended')
            print('Episode Time: ', episode_time)
            
        agent.save_model(save_dir+"/model")
        
        total_end_time = time.time()
        total_sec = total_end_time - total_start_time
        total_time = str(datetime.timedelta(seconds=total_sec)).split(".")[0]
        print("Total num of episode completed, Exiting ....")
        print("Total Time:", total_time)
        
        now = datetime.datetime.now()
        
        ep = [i for i in range(EPISODE)]
        plt.figure(figsize=(15, 10))
        
        plt.subplot(2, 2, 1)
        plt.grid(True)
        plt.plot(ep, avg_rewards, label='avg_reward')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.legend()
        
        reward_txt_ndarray = np.loadtxt(save_dir+"/data/reward.txt", delimiter=' ')
        avg_rewards = np.concatenate([reward_txt_ndarray, avg_rewards], 0)
        np.savetxt(save_dir+"/data/reward.txt", avg_rewards, fmt="%.6e")
        
        if actor_loss is not None and critic_loss is not None:
            plt.subplot(2, 2, 2)
            plt.grid(True)
            plt.plot(range(len(avg_actor_loss)), avg_actor_loss, label='avg_actor_loss')
            plt.xlabel('Episode')
            plt.ylabel('Actor Loss')
            plt.legend()
            
            plt.subplot(2, 2, 3)
            plt.grid(True)
            plt.plot(range(len(avg_critic_loss)), avg_critic_loss, label='avg_critic_loss')
            plt.xlabel('Episode')
            plt.ylabel('Critic Loss')
            plt.legend()
            
            actor_loss_txt_ndarray = np.loadtxt(save_dir+"/data/actor_loss.txt", delimiter=' ')
            avg_actor_loss = np.concatenate([actor_loss_txt_ndarray, avg_actor_loss], 0)
            np.savetxt(save_dir+"/data/actor_loss.txt", avg_actor_loss, fmt="%.6e")
            
            critic_loss_txt_ndarray = np.loadtxt(save_dir+"/data/critic_loss.txt", delimiter=' ')
            avg_critic_loss = np.concatenate([critic_loss_txt_ndarray, avg_critic_loss], 0)
            np.savetxt(save_dir+"/data/critic_loss.txt", avg_critic_loss, fmt="%.6e")

        plt.subplot(2, 2, 4)
        plt.grid(True)
        plt.stem(range(len(goal_time_step)), goal_time_step, label='goal')
        plt.xlabel('Time Step')
        plt.ylabel('Goal')
        plt.legend()
        
        plt.savefig(f'{save_dir+"/image"}/ddpg_{now.year}_{now.month}_{now.day}_{now.hour}_{now.minute}_{now.second}.png', dpi=300, facecolor='#eeeeee', bbox_inches='tight')
    
    except:
        agent.save_model(save_dir+"/model")
        print("try-except save model")
       
    rclpy.shutdown()

if __name__ == '__main__':
    main()
    
    print("Train Completed!!!")
  • 학습 이후 평균 reward, actor_loss, critic_loss를 확인할 수 있도록 그래프로 저장하였다.

3. 실행

# terminal 1
ros2 launch my_environment_pkg my_environment.launch.py

# terminal 2
ros2 run my_environment_pkg train_ddpg

4. 참고 문헌