[Project#] 시리즈는 프로젝트(Trash Throwing Simulation with Deep Reinforcement Learning) 제작 과정을 설명한다. 본 포스팅은 RL 파일 코드 분석에 대한 내용이다.

모든 내용은 이전 글과 이어진다. 이전 글을 오류 없이 모두 진행했다고 가정하고 글을 작성하였다. (이전 글 LINK)

1. run_environment.py

1.1. 전체 코드

import time
import rclpy
from .main_rl_environment import MyRLEnvironmentNode

def main(args=None):

	rclpy.init(args=args)
 
	run_env_node = MyRLEnvironmentNode()
	rclpy.spin_once(run_env_node)
	
	num_episodes = 100
	episonde_horizont = 10

	for episode in range (num_episodes):

		run_env_node.reset_environment_request()					
		time.sleep(2.0)
		step = 0
		
		for step in range (episonde_horizont):
			print (f'----------------Episode:{episode+1} Step:{step+1}--------------------')

			action = run_env_node.generate_action_funct() # generate a sample action vector
			run_env_node.action_step_service(action) # take the action		
			reward, done  = run_env_node.calculate_reward_funct()
			state  = run_env_node.state_space_funct()

			if done == True: 
				# if done is TRUE means the end-effector reach to goal and environmet will reset
				print (f'Goal Reach, Episode ends after {step+1} steps')
				break

			time.sleep(1.0)
			
		print (f'Episode {episode+1} Ended')
		

	print ("Total num of episode completed, Exiting ....")
	rclpy.shutdown()
	

if __name__ == '__main__':
	main()

1.2. Import

import time
import rclpy
from .main_rl_environment import MyRLEnvironmentNode
  • time: 간단한 시간 데이터를 처리하기 위해서 사용되는 파이썬의 내장 모듈이다.
  • rclpy: ‘ROS Client Library for the Python language’의 약자로 ROS 2 시스템에서 노드(Node)와 토픽(Topic), 서비스(Service), 액션(Action) 등을 작성하고 실행하기 위한 인터페이스를 제공한다.
  • main_rl_environment.py 파일에서 MyRLEnvironmentNode 클래스를 가져온다.

1.3. main 함수

  1. rclpy 초기화

     rclpy.init(args=args)
    
    • rclpy.init() 는 ROS 2 노드를 실행하기 전에 ROS 2 시스템을 초기화 한다.
    • argsrclpy.init() 함수에 전달되는 매개변수로, 일반적으로 프로그램 실행 시 사용자로부터 받은 인자(argument)를 전달한다.
  2. spin once

     run_env_node = MyRLEnvironmentNode()
     rclpy.spin_once(run_env_node)
    
    • MyRLEnvironmentNode 객체를 생성하고, rclpy.spin_once() 함수를 활용하여 노드를 한 번만 실행한다. rclpy.spin() 함수를 사용하면 계속해서 노드를 실행할 수 있다.
  3. hyper parameters 선언

     num_episodes = 100
     episonde_horizont = 10
    
    • num_episodes: 전체 에피소드 수를 설정한다.
    • episonde_horizont: 한 에피소드 마다 몇 번의 스텝을 진행할지 설정한다.
  4. 학습 시작

     for episode in range (num_episodes):
        
     		run_env_node.reset_environment_request()					
     		time.sleep(2.0)
     		step = 0
        		
     		for step in range (episonde_horizont):
     			print (f'----------------Episode:{episode+1} Step:{step+1}--------------------')
        
     			action = run_env_node.generate_action_funct() # generate a sample action vector
     			run_env_node.action_step_service(action) # take the action		
     			reward, done  = run_env_node.calculate_reward_funct()
     			state  = run_env_node.state_space_funct()
        
     			if done == True: 
     				# if done is TRUE means the end-effector reach to goal and environmet will reset
     				print (f'Goal Reach, Episode ends after {step+1} steps')
     				break
        
     			time.sleep(1.0)
        			
     		print (f'Episode {episode+1} Ended')
        		
        
     	print ("Total num of episode completed, Exiting ....")
     	rclpy.shutdown()
    
    • run_env_node.reset_environment_request(): 구체와 로봇팔의 초기 위치를 설정한다.
    • time.sleep(2.0): 매 에피소드마다 2초씩 기다린다. 아마 Gazebo 환경을 초기화하는 시간 때문인 것 같다.
    • step = 0: step 변수를 왜 초기화 하는지 모르겠다.
    • run_env_node.generate_action_funct(): 랜덤으로 로봇팔의 각도를 받는다. 조인트가 6개이므로 총 6개의 각도를 리스트 형태로 받는다.
    • run_env_node.action_step_service(action): action을 매개변수로 입력하여 로봇팔의 궤적을 생성하고 구동한다.
    • reward, done = run_env_node.calculate_reward_funct(): reward를 계산하고 반환한다.
    • state = run_env_node.state_space_funct(): state를 반환한다. state는 리스트의 형태이며, 로봇팔 끝점의 좌표, 각 조인트의 각도, 구체의 좌표를 포함한다.
    • if done == True: 로봇팔 끝점이 구체의 좌표와 일정 거리 이내라면 done은 True이다. 즉, 로봇팔 궤적 생성을 성공한 것이다. 이후 해당 에피소드를 종료한다.
    • rclpy.shutdown(): Node를 종료한다.

1.4. name

if __name__ == '__main__':
	main()
  • __name__ 은 파이썬 내장 변수이다. 파일을 실행하면 __name__ 에는 __main__ 이 저장된다. 그러나 import 해서 내장 함수를 실행하면, 모듈명이 저장된다. 즉, 개별적으로 실행해야 한다는 파일을 의미할 때 해당 코드를 작성한다.

2. main_rl_environment

2.1. 전체 코드

import os
import sys
import time
import rclpy
import random
import numpy as np
import message_filters
from rclpy.node import Node
from sensor_msgs.msg import JointState
from gazebo_msgs.msg import ModelStates
from gazebo_msgs.srv import SetEntityState

import tf2_ros 
from tf2_ros import TransformException

from rclpy.action        import ActionClient
from trajectory_msgs.msg import JointTrajectoryPoint
from control_msgs.action import FollowJointTrajectory

from rclpy.duration import Duration

class MyRLEnvironmentNode(Node):

	def __init__ (self):

		super().__init__('node_main_rl_environment')

		print ("initializing.....")
		
		# end-effector transformation
		self.tf_buffer   = tf2_ros.Buffer()
		self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)

		# --------------------------Client for reset the sphere position --------------------------#
		self.client_reset_sphere = self.create_client(SetEntityState,'/gazebo/set_entity_state')
		while not self.client_reset_sphere.wait_for_service(timeout_sec=1.0):
			self.get_logger().info('sphere reset-service not available, waiting...')
		self.request_sphere_reset = SetEntityState.Request()

		# ------------------------- Action-client to change joints position -----------------------#
		self.trajectory_action_client = ActionClient (self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')

		# --------------------------Subcribers topics --------------------------------------------#

		# Subcribe topic with the joints states
		self.joint_state_subscription = message_filters.Subscriber(self, JointState, '/joint_states')
		
		# Subcribe topic with the sphere position
		self.target_point_subscription = message_filters.Subscriber(self, ModelStates, '/gazebo/model_states')

		# Create the message filter (if a msg is detected for each subcriber, do the callback)
		#self.ts = message_filters.TimeSynchronizer([self.joint_state_subscription, self.target_point_subscription], queue_size=30)
		self.ts = message_filters.ApproximateTimeSynchronizer([self.joint_state_subscription, self.target_point_subscription], queue_size=10, slop=0.1, allow_headerless=True)
		self.ts.registerCallback(self.initial_callback)

	def initial_callback(self, joint_state_msg, target_point_msg):

		# Seems that the order that the joint values arrive is: ['joint2', 'joint3', 'joint1', 'joint4', 'joint5', 'joint6']
		
		# Position of each joint:
		self.joint_1_pos = joint_state_msg.position[2]
		self.joint_2_pos = joint_state_msg.position[0]
		self.joint_3_pos = joint_state_msg.position[1]
		self.joint_4_pos = joint_state_msg.position[3]
		self.joint_5_pos = joint_state_msg.position[4]
		self.joint_6_pos = joint_state_msg.position[5]

		# Velocity of each joint:
		self.joint_1_vel =  joint_state_msg.velocity[2]
		self.joint_2_vel =  joint_state_msg.velocity[0]
		self.joint_3_vel =  joint_state_msg.velocity[1]
		self.joint_4_vel =  joint_state_msg.velocity[3]
		self.joint_5_vel =  joint_state_msg.velocity[4]
		self.joint_6_vel =  joint_state_msg.velocity[5]

		# Determine the sphere position in Gazebo wrt world frame
		sphere_index = target_point_msg.name.index('my_sphere') # Get the corret index for the sphere
		self.pos_sphere_x = target_point_msg.pose[sphere_index].position.x 
		self.pos_sphere_y = target_point_msg.pose[sphere_index].position.y 
		self.pos_sphere_z = target_point_msg.pose[sphere_index].position.z 

		# Determine the pose(position and location) of the end-effector w.r.t. world frame
		self.robot_x, self.robot_y, self.robot_z = self.get_end_effector_transformation()

	def get_end_effector_transformation(self):

		# Determine the pose(position and location) of the end effector w.r.t. world frame

		try:
			now = rclpy.time.Time()	
			self.reference_frame = 'world'
			self.child_frame     = 'link6'
			trans = self.tf_buffer.lookup_transform(self.reference_frame, self.child_frame, now) # This calculate the position of the link6 w.r.t. world frame
			
		except TransformException as ex:
			self.get_logger().info(f'Could not transform {self.reference_frame} to {self.child_frame}: {ex}')
			return

		else:
			# Traslation 
			ef_robot_x = trans.transform.translation.x
			ef_robot_y = trans.transform.translation.y
			ef_robot_z = trans.transform.translation.z
			#print ('Translation: [',round (ef_robot_x,3), round(ef_robot_x,3), round(ef_robot_x,3),']')
			
			# Rotation
			ef_qx = trans.transform.rotation.x
			ef_qy = trans.transform.rotation.y
			ef_qz = trans.transform.rotation.z
			ef_qw = trans.transform.rotation.w
			#print ('Rotation: in Quaternion [',round (ef_qx,3), round(ef_qy,3), round(ef_qz,3), round(ef_qw,3),']')

			return round (ef_robot_x,3), round(ef_robot_y,3), round(ef_robot_z,3)

	
	def reset_environment_request(self):

		# Every time this function is called a request to the Reset the Environment is sent 
		# i.e. Move the robot to home position and change the
		# sphere location and waits until get response/confirmation

		# -------------------- reset sphere position------------------#

		# For now the sphere's position will be inside a 1x1x1 workspace in front of the robot 
		sphere_position_x = random.uniform( 0.05, 1.05)
		sphere_position_y = random.uniform( -0.5, 0.5)
		sphere_position_z = random.uniform( 0.05, 1.05)

		self.request_sphere_reset.state.name = 'my_sphere'
		self.request_sphere_reset.state.reference_frame = 'world'
		self.request_sphere_reset.state.pose.position.x = sphere_position_x
		self.request_sphere_reset.state.pose.position.y = sphere_position_y
		self.request_sphere_reset.state.pose.position.z = sphere_position_z
		
		self.future_sphere_reset = self.client_reset_sphere.call_async(self.request_sphere_reset)

		self.get_logger().info('Reseting sphere to new position...')

		rclpy.spin_until_future_complete(self, self.future_sphere_reset)

		sphere_service_response = self.future_sphere_reset.result()
		
		if sphere_service_response.success:
			self.get_logger().info("Sphere Moved to a New Possiton Success")
		else:
			self.get_logger().info("Sphere Reset Request failed")

		
		#---------------------reset robot position-------------------#

		home_point_msg = JointTrajectoryPoint()
		home_point_msg.positions     = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
		home_point_msg.velocities    = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
		home_point_msg.accelerations = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
		home_point_msg.time_from_start = Duration(seconds=2).to_msg()

		joint_names   = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
		home_goal_msg = FollowJointTrajectory.Goal()
		home_goal_msg.goal_time_tolerance    = Duration(seconds=1).to_msg()
		home_goal_msg.trajectory.joint_names = joint_names
		home_goal_msg.trajectory.points      = [home_point_msg]
		
		self.trajectory_action_client.wait_for_server()  # waits for the action server to be available
		
		send_home_goal_future = self.trajectory_action_client.send_goal_async(home_goal_msg) # Sending home-position request
		
		rclpy.spin_until_future_complete(self, send_home_goal_future) # Wait for goal status
		goal_reset_handle = send_home_goal_future.result()

		if not goal_reset_handle.accepted:
			self.get_logger().info(' Home-Goal rejected ')
			return
		self.get_logger().info('Moving robot to home position...')

		get_reset_result = goal_reset_handle.get_result_async()
		rclpy.spin_until_future_complete(self, get_reset_result)  # Wait for response

		if get_reset_result.result().result.error_code == 0:
			self.get_logger().info('Robot in Home position without problems')
		else:
			self.get_logger().info('There was a problem with the action')

	def action_step_service(self, action_values):
		
		# Every time this function is called, it passes the action vector (desire position of each joint) 
		# to the action-client to execute the trajectory
		
		points = []

		point_msg = JointTrajectoryPoint()
		point_msg.positions     = action_values
		point_msg.velocities    = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
		point_msg.accelerations = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
		point_msg.time_from_start = Duration(seconds=2.0).to_msg() # be careful about this time 
		points.append(point_msg) 

		joint_names = ['joint1','joint2','joint3','joint4','joint5','joint6']
		goal_msg    = FollowJointTrajectory.Goal()
		goal_msg.goal_time_tolerance = Duration(seconds=1).to_msg() # goal_time_tolerance allows some freedom in time, so that the trajectory goal can still
															        # succeed even if the joints reach the goal some time after the precise end time of the trajectory.
															
		goal_msg.trajectory.joint_names = joint_names
		goal_msg.trajectory.points      = points

		self.get_logger().info('Waiting for action server to move the robot...')
		self.trajectory_action_client.wait_for_server() # waits for the action server to be available

		self.get_logger().info('Sending goal-action request...')
		self.send_goal_future = self.trajectory_action_client.send_goal_async(goal_msg) 

		self.get_logger().info('Checking if the goal is accepted...')
		rclpy.spin_until_future_complete(self, self.send_goal_future ) # Wait for goal status

		goal_handle = self.send_goal_future.result()

		if not goal_handle.accepted:
			self.get_logger().info(' Action-Goal rejected ')
			return
		self.get_logger().info('Action-Goal accepted')

		self.get_logger().info('Checking the response from action-service...')
		self.get_result = goal_handle.get_result_async()
		rclpy.spin_until_future_complete(self, self.get_result ) # Wait for response

		if self.get_result.result().result.error_code == 0:
			self.get_logger().info('Action Completed without problem')
		else:
			self.get_logger().info('There was a problem with the accion')

	def generate_action_funct(self):

		# This is a continuous action space
		# This function generates random values in radians for each joint
		# These values range were tested in advance to make sure that there were not internal collisions 

		angle_j_1 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
		angle_j_2 = random.uniform( -0.57595, 0.57595)	 # values in degrees [  -33,  33]
		angle_j_3 = random.uniform( -2.51327, 2.51327)   # values in degrees [ -144, 144]
		angle_j_4 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
		angle_j_5 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
		angle_j_6 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]

		return [angle_j_1, angle_j_2, angle_j_3, angle_j_4, angle_j_5, angle_j_6]

	def calculate_reward_funct(self):

		# I aim with this function to get the reward value. For now, the reward is based on the distance
		# i.e. Calculate the euclidean distance between the link6 (end effector) and sphere (target point)
		# and each timestep the robot receives -1 but if it reaches the goal (distance < 0.05) receives +10

		try:
			robot_end_position    = np.array((self.robot_x, self.robot_y, self.robot_z))
			target_point_position = np.array((self.pos_sphere_x, self.pos_sphere_y, self.pos_sphere_z))
			
		except: 
			self.get_logger().info('could not calculate the distance yet, trying again...')
			return

		else:
			distance = np.linalg.norm(robot_end_position - target_point_position)
			
			if distance <= 0.05:
				self.get_logger().info('Goal Reached')
				done = True
				reward_d = 10
			else:
				done = False
				reward_d = -1

			return reward_d, done

	def state_space_funct(self):

		# This function creates the state state vector and returns the current value of each variable
		# i.e end-effector position, each joint value, target (sphere position) 

		try:
			state = [
					self.robot_x, self.robot_y, self.robot_z, 
					self.joint_1_pos, self.joint_2_pos, self.joint_3_pos, self.joint_4_pos, self.joint_5_pos, self.joint_6_pos, 
					self.pos_sphere_x, self.pos_sphere_y, self.pos_sphere_z
					]			 	
		except:
			
			self.get_logger().info('-------node not ready yet, Still getting values------------------')
			return 
		else:
			return state

2.2. Import

import os
import sys
import time
import rclpy
import random
import numpy as np
import message_filters
from rclpy.node import Node
from sensor_msgs.msg import JointState
from gazebo_msgs.msg import ModelStates
from gazebo_msgs.srv import SetEntityState

import tf2_ros 
from tf2_ros import TransformException

from rclpy.action        import ActionClient
from trajectory_msgs.msg import JointTrajectoryPoint
from control_msgs.action import FollowJointTrajectory

from rclpy.duration import Duration
  • message_filters: ROS 2에서 메시지 필터링과 동기화를 수행하는 데 사용되는 패키지이다. 이 패키지는 다수의 토픽에서 수신된 메시지를 동시에 처리하고, 메시지 간의 시간적 또는 순서적 동기화를 제공하는 기능을 제공한다.
  • from sensor_msgs.msg import JointState: sensor_msgs 패키지에서 JointState 메시지 타입을 가져온다. 정확한 메시지 타입은 아래와 같다.

      string[] name
      float64[] position
      float64[] velocity
      float64[] effort
    
  • from gazebo_msgs.msg import ModelStates: gazebo_msgs 패키지에서 ModelStates 메시지 타입을 가져온다. 정확한 메시지 타입은 아래와 같다.

      string[] name
      geometry_msgs/Pose[] pose
      geometry_msgs/Twist[] twist
    
  • from gazebo_msgs.srv import SetEntityState: gazebo_msgs 패키지에서 SetEntityState 서비스 타입을 가져오는 코드이다. SetEntityState를 구글링했지만 정확한 정보를 찾지 못하였다. ros에서는 메시지나 서비스 등의 타입을 터미널에서 확인할 수 있다.

      ros2 interface show gazebo_msgs/srv/SetEntityState
        
      > gazebo_msgs/EntityState state   # Entity state to set to.
      >                                 # Be sure to fill all fields, values of zero have meaning.
      > ---
      > bool success                    # Return true if setting state was successful.
    

    gazebo_msgs/EntityState state 를 입력으로 하고, bool success 를 반환한다. gazebo_msgs/EntityState 는 어떤 형태일까? 아래와 같은 형태를 갖는다.

      # Holds an entity's pose and twist
      string name                 # Entity's scoped name.
                                  # An entity can be a model, link, collision, light, etc.
                                  # Be sure to use gazebo scoped naming notation (e.g. [model_name::link_name])
      geometry_msgs/Pose pose     # Pose in reference frame.
      geometry_msgs/Twist twist   # Twist in reference frame.
      string reference_frame      # Pose/twist are expressed relative to the frame of this entity.
                                  # Leaving empty or "world" defaults to inertial world frame.
    

    geometry_msgs/Posegeometry_msgs/Twist 에 대해서도 확인해보자.

    • geometry_msgs/Pose

        geometry_msgs/Point position
        geometry_msgs/Quaternion orientation
      
      • geometry_msgs/Point

          float64 x
          float64 y
          float64 z
        
      • geometry_msgs/Quaternion

          float64 x
          float64 y
          float64 z
          float64 w
        
    • geometry_msgs/Twist

        geometry_msgs/Vector3 linear
        geometry_msgs/Vector3 angular
      
      • geometry_msgs/Vector3

          float64 x
          float64 y
          float64 z
        
  • import tf2_ros: ROS 변환(tf) 시스템을 위한 기능을 제공한다. ROS 변환(tf) 시스템은 로봇의 다양한 프레임 간의 변환 관계를 추적하고 관리하는데 사용된다. 로봇 시스템에서는 여러 개의 프레임이 존재한다. 예를 들어 로봇의 기준 프레임, 센서 프레임, 도구 프레임 등이 있을 수 있다. 이러한 프레임 간의 변환 관계를 추적하면 로봇의 위치, 자세, 모션 제어 등 다양한 작업을 수행할 수 있다.
  • from tf2_ros import TransformException: tf2_ros 패키지에서 TransformException 예외를 가져온다. TransformException 는 예외 상황을 처리하기 위해 사용된다. 예를 들어, 특정 프레임 간의 변환 정보를 요청할 때 해당 변환 정보를 찾을 수 없는 경우, TransformException 예외가 발생하여 예외 처리를 수행할 수 있다. 이를 통해 예외 상황에 대응하고 적절한 조치를 취할 수 있다.

      std_msgs/Header header
      string[] joint_names
      trajectory_msgs/JointTrajectoryPoint[] points
    
    • std_msgs/Header

        uint32 seq
        time stamp
        string frame_id
      
    • trajectory_msgs/JointTrajectoryPoint

        float64[] positions
        float64[] velocities
        float64[] accelerations
        float64[] effort
        duration time_from_start
      
  • from rclpy.action import ActionClient: 액션 클라이언트를 나타낸다. 액션 클라이언트는 액션 목표를 액션 서버로 전송하고 피드백과 결과를 수신하는 컴포넌트입니다. 액션 클라이언트는 액션 서버와의 상호작용을 통해 작업의 진행 상황을 모니터링하고 결과를 받아올 수 있습니다.
  • from trajectory_msgs.msg import JointTrajectoryPoint: trajectory_msgs.msg 모듈에서 JointTrajectoryPoint 메시지 타입을 가져온다. trajectory_msgs 패키지는 시간에 따른 상태 또는 위치의 순서를 나타내는 경로(trajectory)를 표현하는 메시지 타입을 제공한다. JointTrajectoryPoint 메시지 타입은 특히 관절을 제어하는 경로 상의 한 점을 나타낸다.

      float64[] positions
      float64[] velocities
      float64[] accelerations
      float64[] effort
      duration time_from_start
    
  • from control_msgs.action import FollowJointTrajectory: control_msgs.action 모듈에서 FollowJointTrajectory 액션을 가져온다. control_msgs 패키지는 로봇이나 시스템을 제어하기 위한 메시지와 액션 타입을 제공한다. FollowJointTrajectory 액션은 특히 로봇이 관절 위치의 경로를 따르도록 명령하는 액션을 나타낸다.

      trajectory_msgs/JointTrajectory trajectory
          Header header
              uint32 seq
              time stamp
              string frame_id
          string[] joint_names
          trajectory_msgs/JointTrajectoryPoint[] points
              float64[] positions
              float64[] velocities
              float64[] accelerations
              duration time_from_start
      JointTolerance[] path_tolerance
          string name
          float64 position
          float64 velocity
          float64 acceleration
      JointTolerance[] goal_tolerance
          string name
          float64 position
          float64 velocity
          float64 acceleration
      duration goal_time_tolerance
    
  • from rclpy.duration import Duration: Duration은 ROS 2에서 시간 간격을 나타내는 클래스이다. 시간 간격은 특정 작업이나 이벤트의 지속 시간을 나타내는 데 사용된다. Duration 클래스는 시간 간격을 초 단위로 표현하며, 정수형이나 부동소수점 형태로 값을 설정하고 비교하거나 계산하는 등의 작업을 수행할 수 있다.

2.3. init 함수

def __init__ (self):

	super().__init__('node_main_rl_environment')

	print ("initializing.....")
	
	# end-effector transformation
	self.tf_buffer   = tf2_ros.Buffer()
	self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)

	# --------------------------Client for reset the sphere position --------------------------#
	self.client_reset_sphere = self.create_client(SetEntityState,'/gazebo/set_entity_state')
	while not self.client_reset_sphere.wait_for_service(timeout_sec=1.0):
		self.get_logger().info('sphere reset-service not available, waiting...')
	self.request_sphere_reset = SetEntityState.Request()

	# ------------------------- Action-client to change joints position -----------------------#
	self.trajectory_action_client = ActionClient (self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')

	# --------------------------Subcribers topics --------------------------------------------#

	# Subcribe topic with the joints states
	self.joint_state_subscription = message_filters.Subscriber(self, JointState, '/joint_states')
	
	# Subcribe topic with the sphere position
	self.target_point_subscription = message_filters.Subscriber(self, ModelStates, '/gazebo/model_states')

	# Create the message filter (if a msg is detected for each subcriber, do the callback)
	#self.ts = message_filters.TimeSynchronizer([self.joint_state_subscription, self.target_point_subscription], queue_size=30)
	self.ts = message_filters.ApproximateTimeSynchronizer([self.joint_state_subscription, self.target_point_subscription], queue_size=10, slop=0.1, allow_headerless=True)
	self.ts.registerCallback(self.initial_callback)
  • super().__init__('node_main_rl_environment'): 이 Class(=Node)가 가질 이름을 정해줍니다.

  • end-effector transformation
    • self.tf_buffer = tf2_ros.Buffer(): ROS의 변환(transform) 시스템에서 변환 정보를 관리하는 역할을 한다. 변환 정보는 객체나 좌표계 간의 상호 작용을 표현하며, 로봇의 위치, 자세, 센서 데이터 등을 변환하기 위해 사용된다. tf2_ros.Buffer 객체는 시간에 따라 변화하는 변환 정보를 저장하고, 필요할 때 변환 정보를 검색하고 제공하는 기능을 제공한다. 이를 통해 다른 좌표계 간의 변환을 수행하거나 로봇의 위치 추정, 자세 제어, 센서 데이터 정합 등의 작업을 수행할 수 있다.
    • self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self): tf2_ros.TransformListener는 변환 정보를 수신하고 변환 정보를 기반으로 좌표 변환을 수행하는 역할을 한다. 이 객체는 tf2_ros.Buffer를 활용하여 변환 정보를 요청하고, 수신한 변환 정보를 사용하여 좌표계 변환을 수행할 수 있다.
  • Client for reset the sphere position
    • self.create_client(SetEntityState,'/gazebo/set_entity_state'):

    self.create_client() 함수는 ROS 2에서 제공하는 메소드로, 주어진 서비스 타입과 서비스 주제(topic)를 기반으로 클라이언트를 생성한다. SetEntityStategazebo_msgs.srv 모듈에 정의된 서비스 타입으로, Gazebo 시뮬레이터에서 엔티티(예: 로봇, 객체)의 상태를 재설정하기 위해 사용된다.

    self.client_reset_sphere 변수에 클라이언트를 할당함으로써, 이 클라이언트를 사용하여 '/gazebo/set_entity_state' 주제를 가진 SetEntityState 서비스를 호출할 수 있다. 이를 통해 Gazebo 시뮬레이터에서 구체(sphere) 엔티티의 상태를 재설정하는 요청을 보낼 수 있다.

    • wait_for_service(timeout_sec=1.0): 클라이언트가 서비스를 기다리는 기능을 제공한다. 이 함수는 지정된 시간(여기서는 1초) 동안 서비스가 이용 가능할 때까지 대기한다. 서비스가 이용 가능하면 True를 반환하고, 그렇지 않으면 False를 반환합니다. 즉, while문은 클라이언트가 서비스를 기다리는 동안 다른 작업을 수행하거나 대기 상태에 있을 수 있도록 한다. 서비스가 이용 가능해질 때가지 기다린다.
    • SetEntityState.Request(): SetEntityState 서비스 요청 메시지 타입인 Request의 인스턴스를 생성한다. SetEntityStategazebo_msgs.srv 모듈에 정의된 서비스 타입으로, Gazebo 시뮬레이터에서 엔티티의 상태를 재설정하기 위한 요청을 보낼 때 사용한다.
  • Action-client to change joints position
    • ActionClient (self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory'): 액션 클라이언트를 생성한다. 현재 클래스 또는 노드 객체에서 FollowJointTrajectory 라는 액션 서비스 타입의 변수를 생성하고, '/joint_trajectory_controller/follow_joint_trajectory'는 액션 서비스의 topic을 나타낸다.
  • Subcribers topics
    • message_filters.Subscriber(self, JointState, '/joint_states'): message_filters.Subscriber는 ROS 2에서 메시지 필터링을 수행하기 위해 사용되는 서브스크라이버 클래스이다. self는 현재 클래스 또는 노드 객체를 나타내며, JointState는 서브스크라이버가 수신하는 메시지 타입을 나타내는 변수이다. '/joint_states'는 서브스크라이버가 구독하는 주제(topic)를 나타낸다. 즉, /joint_states topic로부터 수신되는 JointState 메시지를 필터링할 수 있습니다.
    • message_filters.Subscriber(self, ModelStates, '/gazebo/model_states'): /gazebo/model_states topic으로부터 수신되는 ModelStates 메시지를 필터링할 수 있다.
    • self.ts = message_filters.ApproximateTimeSynchronizer([self.joint_state_subscription, self.target_point_subscription], queue_size=10, slop=0.1, allow_headerless=True): 메시지 필터링을 위한 시간 동기화(Time Synchronization) 객체를 생성하는 코드이다. 이 객체는 self.joint_state_subscriptionself.target_point_subscription 서브스크라이버로부터 수신되는 메시지를 시간에 따라 동기화하여 처리하기 위해 사용된다.

      queue_size=10은 시간 동기화 시 사용되는 큐(queue)의 크기를 나타냅니다. 큐는 동기화할 메시지를 저장하는 버퍼 역할을 합니다. slop=0.1은 메시지 동기화에 허용되는 시간 차이를 나타냅니다. 이 값보다 작은 시간 차이는 동기화된 메시지로 간주됩니다. allow_headerless=True는 메시지에 헤더 정보가 없어도 동기화를 허용하는지를 나타냅니다.

      시간 동기화 객체를 생성하고 변수에 할당함으로써, self.joint_state_subscriptionself.target_point_subscription로부터 수신되는 메시지를 시간에 따라 동기화하여 처리할 수 있습니다.

    • self.ts.registerCallback(self.initial_callback): 간 동기화 객체에 초기 콜백 함수를 등록하는 코드이다. 등록된 콜백 함수는 시간에 따라 동기화된 메시지가 수신될 때 호출된다.

2.4. initial_callback 함수

def initial_callback(self, joint_state_msg, target_point_msg):

	# Seems that the order that the joint values arrive is: ['joint2', 'joint3', 'joint1', 'joint4', 'joint5', 'joint6']
	
	# Position of each joint:
	self.joint_1_pos = joint_state_msg.position[2]
	self.joint_2_pos = joint_state_msg.position[0]
	self.joint_3_pos = joint_state_msg.position[1]
	self.joint_4_pos = joint_state_msg.position[3]
	self.joint_5_pos = joint_state_msg.position[4]
	self.joint_6_pos = joint_state_msg.position[5]

	# Velocity of each joint:
	self.joint_1_vel =  joint_state_msg.velocity[2]
	self.joint_2_vel =  joint_state_msg.velocity[0]
	self.joint_3_vel =  joint_state_msg.velocity[1]
	self.joint_4_vel =  joint_state_msg.velocity[3]
	self.joint_5_vel =  joint_state_msg.velocity[4]
	self.joint_6_vel =  joint_state_msg.velocity[5]

	# Determine the sphere position in Gazebo wrt world frame
	sphere_index = target_point_msg.name.index('my_sphere') # Get the corret index for the sphere
	self.pos_sphere_x = target_point_msg.pose[sphere_index].position.x 
	self.pos_sphere_y = target_point_msg.pose[sphere_index].position.y 
	self.pos_sphere_z = target_point_msg.pose[sphere_index].position.z 

	# Determine the pose(position and location) of the end-effector w.r.t. world frame
	self.robot_x, self.robot_y, self.robot_z = self.get_end_effector_transformation()
  • callback 함수가 호출될 때마다 joint_state_msg의 위치, 속도를 각 변수에 저장한다.
  • sphere_index를 찾고 sphere의 좌표를 각 변수에 저장한다.
  • self.get_end_effector_transformation() 함수를 호출하면, end_effector의 끝점을 반환한다.

2.5. get_end_effector_transformation 함수

def get_end_effector_transformation(self):

		# Determine the pose(position and location) of the end effector w.r.t. world frame

		try:
			now = rclpy.time.Time()	
			self.reference_frame = 'world'
			self.child_frame     = 'link6'
			trans = self.tf_buffer.lookup_transform(self.reference_frame, self.child_frame, now) # This calculate the position of the link6 w.r.t. world frame
			
		except TransformException as ex:
			self.get_logger().info(f'Could not transform {self.reference_frame} to {self.child_frame}: {ex}')
			return

		else:
			# Traslation 
			ef_robot_x = trans.transform.translation.x
			ef_robot_y = trans.transform.translation.y
			ef_robot_z = trans.transform.translation.z
			#print ('Translation: [',round (ef_robot_x,3), round(ef_robot_x,3), round(ef_robot_x,3),']')
			
			# Rotation
			ef_qx = trans.transform.rotation.x
			ef_qy = trans.transform.rotation.y
			ef_qz = trans.transform.rotation.z
			ef_qw = trans.transform.rotation.w
			#print ('Rotation: in Quaternion [',round (ef_qx,3), round(ef_qy,3), round(ef_qz,3), round(ef_qw,3),']')

			return round (ef_robot_x,3), round(ef_robot_y,3), round(ef_robot_z,3)
  • try/except/else 문법이므로 try를 시도하고, 에러가 발생하지 않는다면 else를 실행한다.
  • end effector의 좌표를 받는다.

2.6. reset_environment_request 함수

sphere_position_x = random.uniform( 0.05, 1.05)
sphere_position_y = random.uniform( -0.5, 0.5)
sphere_position_z = random.uniform( 0.05, 1.05)

self.request_sphere_reset.state.name = 'my_sphere'
self.request_sphere_reset.state.reference_frame = 'world'
self.request_sphere_reset.state.pose.position.x = sphere_position_x
self.request_sphere_reset.state.pose.position.y = sphere_position_y
self.request_sphere_reset.state.pose.position.z = sphere_position_z
  • 랜덤으로 구체의 위치를 설정한다.
self.future_sphere_reset = self.client_reset_sphere.call_async(self.request_sphere_reset)
  • '/gazebo/set_entity_state' 서비스 서버에 대한 클라이언트(self.client_reset_sphere)를 사용하여 self.request_sphere_reset를 비동기적으로 호출하고, 호출 결과에 대한 응답을 추적하기 위해 self.future_sphere_reset 미래 객체를 생성하는 역할을 한다.
rclpy.spin_until_future_complete(self, self.future_sphere_reset)

sphere_service_response = self.future_sphere_reset.result()
  • rclpy.spin_until_future_complete: self.future_sphere_reset에 대한 서비스 요청이 완료될 때까지 루프를 실행한다. 이는 비동기적으로 발송된 서비스 요청(self.future_sphere_reset)에 대한 응답이 도착할 때까지 루프를 반복하면서 ROS 2 시스템을 계속 실행하는 것을 의미한다.
  • 서비스 요청의 결과를 저장한다.
if sphere_service_response.success:
	self.get_logger().info("Sphere Moved to a New Possiton Success")
else:
	self.get_logger().info("Sphere Reset Request failed")
  • sphere_service_response.success: 서비스 요청을 받았으면 True, 못 받으면 False를 반환한다.
home_point_msg = JointTrajectoryPoint()
home_point_msg.positions     = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
home_point_msg.velocities    = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
home_point_msg.accelerations = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
home_point_msg.time_from_start = Duration(seconds=2).to_msg()
  • JointTrajectoryPoint는 아래와 같은 메시지 형식을 갖고 있다. 위 코드는 각각 알맞은 초기값을 저장한 것이다.

      float64[] positions
      float64[] velocities
      float64[] accelerations
      float64[] effort
      duration time_from_start
    
  • Duration(seconds=2).to_msg(): 2초 동안의 시간 간격을 나타내는 Duration 객체를 메시지 형식으로 변환하는 것이다.

joint_names   = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
home_goal_msg = FollowJointTrajectory.Goal()
home_goal_msg.goal_time_tolerance    = Duration(seconds=1).to_msg()
home_goal_msg.trajectory.joint_names = joint_names
home_goal_msg.trajectory.points      = [home_point_msg]
  • FollowJointTrajectory 액션의 목표를 저장한 변수를 선언한다.
  • goal_time_tolerance: 목표 동작의 시간 허용 범위를 설정한다. 위 코드에선 1초 내에 목표 동작을 완료해야 함을 나타낸다.
  • trajectory.joint_names: 각 관절의 이름을 설정한다.
  • trajectory.points: 관절 경로에서 목표로 하는 포인트를 설정하는 것으로, 롭소이 도달해야 할 초기 위치, 속도, 가속도, 시간 간격을 나타낸다.
  • 따라서, 위의 코드는 로봇의 관절 제어에 필요한 목표 동작을 설정하는 역할을 한다. 이를 통해 로봇은 목표 동작을 수행하고 지정된 관절 경로를 따라 움직일 수 있다.
self.trajectory_action_client.wait_for_server()

send_home_goal_future = self.trajectory_action_client.send_goal_async(home_goal_msg)
		
rclpy.spin_until_future_complete(self, send_home_goal_future)
goal_reset_handle = send_home_goal_future.result()
  • wait_for_server(): 액션 서버가 실행될 때까지 기다린다. 액션 서버가 준비되면 다음 동작을 수행한다.
  • send_goal_async(home_goal_msg): 목표로 하는 액션 목표를 비동기적으로 보낸다. 이를 통해 로봇은 지정된 관절 경로를 따라 움직이는 목표를 수행하게 된다. send_home_goal_future 변수에는 액션 목표의 전송 결과를 나타내는 Future 객체가 저장된다.
  • rclpy.spin_until_future_complete(self, send_home_goal_future): send_home_goal_future 가 완료될 때까지 노드가 계속 실행되도록 한다. 액션 목표가 완료될 때까지 노드는 액션 서버와 통신을 처리할 수 있다.
if not goal_reset_handle.accepted:
	self.get_logger().info(' Home-Goal rejected ')
	return
self.get_logger().info('Moving robot to home position...')
  • accepted: 액션 목표가 액션 서버에 성공적으로 수락되었는지를 나타내는 불리언 값이다. 즉, 액션 목표가 수락되지 않았을 때 if 문 내의 코드 블록이 실행된다.
  • 위의 코드는 초기 위치로 이동하는 액션 목표가 성공적으로 수락됐는지 확인하고, 로봇이 초기 위치로 이동하는 동안 해당 상태를 로그로 기록하는 역할을 한다.
get_reset_result = goal_reset_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_reset_result)  # Wait for response

if get_reset_result.result().result.error_code == 0:
	self.get_logger().info('Robot in Home position without problems')
else:
	self.get_logger().info('There was a problem with the action')
  • get_result_async(): 액션 목표의 결과를 비동기적으로 가져오는 핸들인 get_reset_result를 생성합니다. 이 핸들은 액션의 결과를 나타내는 ActionResult 객체를 반환합니다.
  • rclpy.spin_until_future_complete() : 비동기 결과를 기다린다. 액션 결과가 도착할 때까지 루프를 실행하며, 액션 서버로부터 결과를 수신하기 위해 사용된다.
  • error_code : 액션 실행 중 발생한 오류를 나타낸다. 오류가 없을 때 0을 나타낸다.

2.7. action_step_service 함수

points = []

point_msg = JointTrajectoryPoint()
point_msg.positions     = action_values
point_msg.velocities    = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
point_msg.accelerations = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
point_msg.time_from_start = Duration(seconds=2.0).to_msg() # be careful about this time 
points.append(point_msg) 

joint_names = ['joint1','joint2','joint3','joint4','joint5','joint6']
goal_msg    = FollowJointTrajectory.Goal()
goal_msg.goal_time_tolerance = Duration(seconds=1).to_msg() # goal_time_tolerance allows some freedom in time, so that the trajectory goal can still
													        # succeed even if the joints reach the goal some time after the precise end time of the trajectory.
													
goal_msg.trajectory.joint_names = joint_names
goal_msg.trajectory.points      = points

self.get_logger().info('Waiting for action server to move the robot...')
self.trajectory_action_client.wait_for_server() # waits for the action server to be available
  • reset_environment_request() 함수의 내용과 비슷하다. 함수의 파라미터로 action_values를 받고 이 지점으로 가기 위한 궤적을 생성한다.
  • 이후 액션 서버가 실행될 때까지 기다린다.
self.get_logger().info('Sending goal-action request...')
self.send_goal_future = self.trajectory_action_client.send_goal_async(goal_msg) 

self.get_logger().info('Checking if the goal is accepted...')
rclpy.spin_until_future_complete(self, self.send_goal_future ) # Wait for goal status

goal_handle = self.send_goal_future.result()

if not goal_handle.accepted:
	self.get_logger().info(' Action-Goal rejected ')
	return
self.get_logger().info('Action-Goal accepted')
  • send_goal_async: 목표로 하는 액션 목표를 비동기적으로 보낸다. 이를 통해 로봇은 지정된 관절 경로를 따라 움직이는 목표를 수행하게 된다. send_home_goal_future 변수에는 액션 목표의 전송 결과를 나타내는 Future 객체가 저장된다.
  • spin_until_future_complete: send_goal_future액션 결과가 도착할 때까지 루프를 실행하며, 액션 서버로부터 결과를 수신하기 위해 사용된다.
  • send_goal_future 결과의 성공 여부를 확인한다.
self.get_logger().info('Checking the response from action-service...')
self.get_result = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, self.get_result ) # Wait for response

if self.get_result.result().result.error_code == 0:
	self.get_logger().info('Action Completed without problem')
else:
	self.get_logger().info('There was a problem with the accion')
  • 이전에 설명한 코드와 비슷한 형식이다. goal_handle 액션의 결과를 나타내는 객체를 생성한다.
  • get_result 액션의 결과를 기다리고 성공 여부를 확인한다.

2.8. generate_action_funct 함수

angle_j_1 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
angle_j_2 = random.uniform( -0.57595, 0.57595)	 # values in degrees [  -33,  33]
angle_j_3 = random.uniform( -2.51327, 2.51327)   # values in degrees [ -144, 144]
angle_j_4 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
angle_j_5 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]
angle_j_6 = random.uniform( -3.14159, 3.14159)   # values in degrees [ -180, 180]

return [angle_j_1, angle_j_2, angle_j_3, angle_j_4, angle_j_5, angle_j_6]
  • 6개의 조인트 각도를 랜덤으로 반환한다.

2.9. calculate_reward_funct 함수

try:
	robot_end_position    = np.array((self.robot_x, self.robot_y, self.robot_z))
	target_point_position = np.array((self.pos_sphere_x, self.pos_sphere_y, self.pos_sphere_z))
	
except: 
	self.get_logger().info('could not calculate the distance yet, trying again...')
	return

else:
	distance = np.linalg.norm(robot_end_position - target_point_position)
	
	if distance <= 0.05:
		self.get_logger().info('Goal Reached')
		done = True
		reward_d = 10
	else:
		done = False
		reward_d = -1

	return reward_d, done
  • robot_end_position, target_point_position: 각 위치 좌표를 numpy 배열로 생성한다. 두 좌표의 거리를 쉽게 계산하기 위해 numpy 배열로 생성한 것 같다.
  • np.linalg.norm(): norm을 계산하는 numpy 모듈의 함수이다. 두 위치 벡터를 뺀 후 norm을 계산하여 두 벡터의 거리를 계산할 수 있다.
  • 거리가 0.05 보다 작다면 목표 지점에 성공한 것이므로 10점의 점수를 준다. 그렇지 않다면 -1점을 준다.
  • 계산한 보상과 성공 여부를 반환한다.

2.10. state_space_funct 함수

try:
	state = [
			self.robot_x, self.robot_y, self.robot_z, 
			self.joint_1_pos, self.joint_2_pos, self.joint_3_pos, self.joint_4_pos, self.joint_5_pos, self.joint_6_pos, 
			self.pos_sphere_x, self.pos_sphere_y, self.pos_sphere_z
			]			 	
except:
	
	self.get_logger().info('-------node not ready yet, Still getting values------------------')
	return 
else:
	return state
  • 현재 state를 반환하며, end effector 끝점의 좌표, 각 조인트의 좌표, 구체의 좌표를 포함한다.

3. 참고 문헌