ROS2 基础 (三) - 节点间通信:参数与动作
1. 参数(Parameters)
在开发中,有很多参数和设置需要后期调整,如果每次都要修改代码并重新编译,效率会非常低。ROS提供了参数这一通信机制,允许我们在运行时动态地设置和获取参数。
参数由键值对组成,每个键值对包含一个名字和一个数值。参数值类型可以是bool、int64、float64、string和它们的数组,以及字节数组byte[]。
打开三个终端A、B,C:
终端A:运行turtlesim节点
# 终端A
ros2 run turtlesim turtlesim_node
终端B:运行键盘控制节点
# 终端B
ros2 run turtlesim turtle_teleop_key
终端C:运行参数常用命令
- 查看参数列表
ros2 param list --param-type
--param-type选项使参数同时显示其数据类型
- 查看参数描述
# ros2 param describe <node_name> <param_name>
ros2 param describe /turtlesim background_b
可看到turtlesim节点的background_b参数的详情:类型为整型,描述为海龟模拟器背景颜色的蓝色分量
- 获取参数值
# ros2 param get <node_name> <param_name>
ros2 param get /turtlesim background_b
ros2 param get /turtlesim background_g
ros2 param get /turtlesim background_r
可得知背景颜色的RGB值为(69,86,255),即蓝色分量为255,绿色分量为86,红色分量为69
- 设置参数值
# ros2 param set <node_name> <param_name> <value>
ros2 param set /turtlesim background_r 44
ros2 param set /turtlesim background_g 156
ros2 param set /turtlesim background_b 10
设置背景颜色为RGB(44,156,10),最终背景颜色变为绿色
- 保存参数
ros2 param dump /turtlesim > turtlesim_params.yaml
# ros2 param dump /turtlesim 以yaml格式输出turtlesim节点的参数列表,使用">"重定向到turtlesim_params.yaml文件中
- 加载参数
重启海龟模拟器节点,背景颜色变为默认的蓝色
加载之前保存的参数文件
# ros2 param load <node_name> <parameter_file>
ros2 param load /turtlesim turtlesim_params.yaml
背景颜色变为之前设置的绿色
也可以在启动节点时直接加载参数文件
# ros2 run <package_name> <executable_name> --ros-args --params-file <parameter_file>
ros2 run turtlesim turtlesim_node --ros-args --params-file turtlesim_params.yaml
2. 参数编程
- 创建功能包
# 创建工作空间
mkdir -p example_param
cd example_param
# 创建ros功能包
ros2 pkg create example_parameters_rclpy \
--build-type ament_python \
--dependencies rclpy \
--destination-directory src \
--node-name parameters_basic \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
- 编写节点代码
路径:example_param/src/example_parameters_rclpy/example_parameters_rclpy/parameters_basic.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
class ParameterBasicNode(Node):
"""
创建一个ParametersBasicNode节点,并在初始化时输出一个话
"""
def __init__(self, name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
# 声明参数
self.declare_parameter('rcl_log_level', 0)
# 获取参数
log_level = self.get_parameter("rcl_log_level").value
# 设置参数
self.get_logger().set_level(log_level)
# 定时修改
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
"""定时器回调函数"""
# 轮询获取参数
log_level = self.get_parameter("rcl_log_level").value
# 重新设置参数
self.get_logger().set_level(log_level)
print(f"========================{log_level}=============================")
self.get_logger().debug("我是DEBUG级别的日志,我被打印出来了!")
self.get_logger().info("我是INFO级别的日志,我被打印出来了!")
self.get_logger().warn("我是WARNING级别的日志,我被打印出来了!")
self.get_logger().error("我是ERROR级别的日志,我被打印出来了!")
self.get_logger().fatal("我是FATAL级别的日志,我被打印出来了!")
def main(args=None):
rclpy.init(args=args) # 初始化rclpy
node = ParameterBasicNode("parameters_basic") # 新建一个节点
rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
rclpy.shutdown() # 关闭rclpy
节点含有一个定时器,每0.5秒获取参数"rcl_log_level"并设置为当前日志记录器logger的日志级别
其中日志级别由低到高依次为:
- DEBUG:调试信息,最详细的日志级别,适用于开发和调试阶段,
- INFO:一般信息,适用于正常运行时的日志记录
- WARNING:警告信息,表示可能出现问题的情况,但不影响程序的正常运行
- ERROR:错误信息,表示程序出现了错误,可能会影响程序的正常运行
- FATAL:致命错误,表示程序出现了严重错误,可能会导致程序崩溃
当日志级别被设置为某个级别时,只有该级别及以上的日志才会被打印出来。例如,当日志级别设置为INFO时,DEBUG级别的日志不会被打印出来。
- 编译测试
colcon build --packages-select example_parameters_rclpy
source install/setup.bash
ros2 run example_parameters_rclpy parameters_basic
- 指定参数
静态指定参数运行:
ros2 run example_parameters_rclpy parameters_basic --ros-args -p rcl_log_level:=10
动态指定参数:
# 先运行节点
ros2 run example_parameters_rclpy parameters_basic
# 在另一个终端动态修改参数值,观察日志输出
ros2 param set /parameters_basic rcl_log_level 10 # DEBUG
ros2 param set /parameters_basic rcl_log_level 20 # INFO
ros2 param set /parameters_basic rcl_log_level 30 # WARNING
ros2 param set /parameters_basic rcl_log_level 40 # ERROR
ros2 param set /parameters_basic rcl_log_level 50 # FATAL
同时可以在RQt中动态查看和更改参数:
# 新建终端
rqt
在菜单栏选择Plugins->Configuration->Dynamic Reconfigure,在右上角刷新一下,选中parameters_basic节点,可在右侧看到"rcl_log_level"参数,更改该参数后控制台日志输出随之改变
3. 动作(Actions)
动作有三个组成部分:
- 目标(Goal):客户端发送给服务端的请求,包含要执行的任务信息,服务端针对该目标要有响应
- 反馈(Feedback):服务端在执行目标过程中发送给客户端的状态更新,包含当前执行进度、结果等信息
- 结果(Result):服务端完成目标执行后发送给客户端的最终结果,
动作可看做服务+话题的组合:
- 发送目标服务:客户端向服务端发送目标请求,服务端接收请求并响应,开始执行目标
- 获取目标状态服务:客户端向服务端查询目标状态,服务端接收请求并响应,返回任务反馈
- 取消目标服务:客户端向服务端发送取消请求,服务端接收请求并响应,停止执行目标
- 反馈话题:服务端在执行过程中反馈当前状态
开启三个终端A、B、C:
终端A:运行turtlesim节点
ros2 run turtlesim turtlesim_node
终端B:运行键盘控制节点
ros2 run turtlesim turtle_teleop_key
终端提示:
Reading from keyboard
---------------------------
Use arrow keys to move the turtle.
Use g|b|v|c|d|e|r|t keys to rotate to absolute orientations. 'f' to cancel a rotation.
'q' to quit.
箭头方向键可以控制乌龟前进、后退、相对旋转,是通过发布/turtle1/cmd_vel话题来控制乌龟移动的 而F和它周围的九个按键可以调整乌龟的绝对朝向,是通过动作/turtle1/rotate_absolute来控制的
终端C:运行常用action命令
- 查看动作列表
ros2 action list -t
# -t选项使动作同时显示其接口类型
- 查看动作信息
ros2 action info /turtle1/rotate_absolute
- 发送目标请求
首先查看接口信息
ros2 interface show turtlesim_msgs/action/RotateAbsolute
- theta为请求字段,表示乌龟要旋转到的绝对朝向,单位为弧度
- delta为结果字段,记录与起点的相对旋转角度,单位为弧度
- remaining为反馈字段,表示乌龟与目标朝向之间的剩余角度,单位为弧度
# ros2 action send_goal <action_name> <action_type> <goal_request> [--feedback]
ros2 action send_goal /turtle1/rotate_absolute turtlesim_msgs/action/RotateAbsolute "{theta: 1.5}" --feedback
# 发送一个目标请求,要求乌龟旋转到绝对朝向1.5弧度,并显示反馈信息
服务端接收请求并响应(Goal accepted)、在执行过程中反馈(Feedback remaining),最终完成目标并返回结果(Result delta)和状态(Goal finished with status SUCCEEDED)
4. 动作编程
(1) 创建动作所需接口
- 创建接口功能包
# 创建新工作空间
mkdir -p example_action
cd example_action
# 创建接口功能包
ros2 pkg create robot_control_interfaces \
--build-type ament_cmake \
--destination-directory src \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
- 创建动作接口文件
mkdir -p src/robot_control_interfaces/action
touch src/robot_control_interfaces/action/MoveRobot.action
- 添加依赖
# package.xml
<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
# CMakeLists.txt
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/MoveRobot.action"
)
- 编写动作接口文件
# MoveRobot.action
# Goal: 要移动的距离
float32 distance
---
# Result: 最终的位置
float32 pose
---
# Feedback: 中间反馈的位置和状态
float32 pose
uint32 status
uint32 STATUS_MOVING = 3
uint32 STATUS_STOP = 4
- 编译接口功能包
colcon build --packages-select robot_control_interfaces
(2) 创建动作节点
- 创建功能包
# 工作空间 example_action 下
ros2 pkg create example_action_rclpy \
--build-type ament_python \
--dependencies rclpy robot_control_interfaces \
--destination-directory src \
--node-name action_robot \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
# 再另外创建节点文件action_control.py
touch src/example_action_rclpy/example_action_rclpy/action_control.py
# 创建类文件robot.py
touch src/example_action_rclpy/example_action_rclpy/robot.py
声明入口
# setup.py
entry_points={
'console_scripts': [
'action_robot = example_action_rclpy.action_robot:main',
'action_control = example_action_rclpy.action_control:main'
],
},
2. 编写节点代码
- 类文件robot.py
from robot_control_interfaces.action import MoveRobot
import math
class Robot():
"""机器人类,模拟一个机器人"""
def __init__(self) -> None:
self.current_pose_ = 0.0
self.target_pose_ = 0.0
self.move_distance_ = 0.0
self.status_ = MoveRobot.Feedback.STATUS_STOP
def get_status(self):
"""获取状态"""
return self.status_
def get_current_pose(self):
"""获取当前位置"""
return self.current_pose_
def close_goal(self):
"""是否已接近目标"""
return math.fabs(self.target_pose_ - self.current_pose_) < 0.01 # 距离绝对值是否小于0.01
def stop_move(self):
"""停止移动"""
self.status_ = MoveRobot.Feedback.STATUS_STOP
def move_step(self):
"""移动一小步"""
direct = self.move_distance_ / math.fabs(self.move_distance_) # 移动方向
step = direct * math.fabs(self.target_pose_ - self.current_pose_) * 0.1 # 单步移动距离
self.current_pose_ += step # 移动一步
print(f"移动了:{step}当前位置:{self.current_pose_}")
return self.current_pose_
def set_goal(self, distance):
"""设置目标"""
self.move_distance_ = distance # 移动距离
self.target_pose_ += distance # 更新目标位置
if self.close_goal(): # 若已接近目标,不更新
self.stop_move()
return False
# 未接近目标,设置移动
self.status_ = MoveRobot.Feedback.STATUS_MOVING # 更新状态为移动
return True
Robot类模拟了一个机器人及其移动功能,通过在其它节点中调用Robot来实现对机器人的控制
- 服务端action_robot.py
#!/usr/bin/env python3
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from example_action_rclpy.robot import Robot
class ActionRobot(Node):
"机器人端Action服务 "
def __init__(self, name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
self.robot_ = Robot() # 创建一个Robot对象
self.action_server_ = ActionServer(
node=self,
action_type=MoveRobot,
action_name='move_robot',
execute_callback=self.execute_callback
) # 创建一个ActionServer,指定节点、接口类型、动作名称和执行回调函数
def execute_callback(self, goal_handle: ServerGoalHandle):
"处理接收到的Goal的回调函数"
self.get_logger().info("执行移动机器人")
feedback_msg = MoveRobot.Feedback()
self.robot_.set_goal(goal_handle.request.distance) # 设置Robot目标为请求目标的distance
while rclpy.ok() and not self.robot_.close_goal(): # 当系统正常且任务未完成(未接近目的地)
# 移动
self.robot_.move_step()
# 反馈
feedback_msg.pose = self.robot_.get_current_pose()
feedback_msg.status = self.robot_.get_status()
goal_handle.publish_feedback(feedback_msg)
# 检查是否有取消请求
if goal_handle.is_cancel_requested:
result = MoveRobot.Result()
result.pose = self.robot_.get_current_pose()
return result
time.sleep(0.5)
# 任务完成,设置状态为完成
goal_handle.succeed()
# 返回结果
result = MoveRobot.Result()
result.pose = self.robot_.get_current_pose()
return result
def main(args=None):
"主函数"
rclpy.init(args=args)
action_robot = ActionRobot("action_robot")
rclpy.spin(action_robot)
rclpy.shutdown()
服务端节点ActionRobot创建了一个ActionServer,监听/move_robot动作。
当接收到目标请求时,调用execute_callback函数处理请求,在该函数中通过调用Robot类的方法来移动机器人,并在移动过程中不断反馈当前状态和位置,直到任务完成或收到取消请求,最终返回结果(当前位置)。
- 客户端action_control.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入接口
from robot_control_interfaces.action import MoveRobot
class ActionControl(Node):
"Action客户端"
def __init__(self, name):
super().__init__(name)
self.get_logger().info(f"节点已启动:{name}!")
self.action_client_ = ActionClient(
node=self,
action_type=MoveRobot,
action_name='move_robot'
) # 创建一个ActionClient,指定节点、接口类型和动作名称
self.send_goal_timer_ = self.create_timer(1, self.send_goal) # 定时器:1秒后执行
def send_goal(self):
"""发送目标"""
self.send_goal_timer_.cancel() # 取消定时器:只发送一次目标
goal_msg = MoveRobot.Goal() # 构建Goal请求
goal_msg.distance = 5.0 # 移动5米
self.action_client_.wait_for_server() # 等待服务器上线
# 发送请求
self._send_goal_future = self.action_client_.send_goal_async(
goal=goal_msg,
feedback_callback=self.feedback_callback # 设置反馈的回调函数
)
self._send_goal_future.add_done_callback(self.goal_response_callback) # 设置动作请求的回调函数
def goal_response_callback(self, future):
"""处理请求响应的回调函数"""
goal_handle = future.result() # send_goal的结果
if not goal_handle.accepted: # 若服务端拒绝请求
self.get_logger().info('Goal rejected')
return
# 服务端接受请求
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async() # 从'请求'的结果里获取'执行'的task
self._get_result_future.add_done_callback(self.get_result_callback) # 任务执行完成后回调get_result_callback
def get_result_callback(self, future):
"""获取动作执行结果的回调函数"""
result = future.result().result
self.get_logger().info(f"Result: {result.pose}")
def feedback_callback(self, feedback_msg):
"""获取执行过程中的反馈的回调函数"""
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.pose}')
def main(args=None):
"主函数"
rclpy.init(args=args)
action_robot = ActionControl("action_control")
rclpy.spin(action_robot)
rclpy.shutdown()
客户端节点ActionControl创建了一个ActionClient,连接/move_robot动作。在send_goal函数中构建一个目标请求,等待服务器上线后发送请求,并设置反馈和结果的回调函数。
过程中有三个回调函数:
goal_response_callback处理服务端对目标请求的响应feedback_callback处理服务端在执行过程中发送的反馈信息get_result_callback处理服务端完成目标执行后的结果
有两次Future回调:一次是发送目标请求后的回调,一次是获取结果后的回调
(3) 编译测试
编译
colcon build
新建两个终端,分别运行服务端和客户端节点
# 终端A:运行服务端节点
source install/setup.bash
ros2 run example_action_rclpy action_robot
# 终端B:运行客户端节点
source install/setup.bash
ros2 run example_action_rclpy action_control
服务端接收到请求后先响应请求(Goal accepted),然后开始执行目标,在执行过程中不断反馈当前状态
最终完成目标并返回结果(Result)
节点间通信方式总结
| 通信方式 | 适用场景 | 特点 |
|---|---|---|
| 话题(Topics) | 适用于实时性强的场景,如传感器数据等 | 单向通信 |
| 服务(Services) | 适用于强调服务特性和反馈的场景 | 双向通信 |
| 动作(Actions) | 适用于需要实时反馈长时间执行的任务,如导航、路径规划等 | 原理基于话题和服务 |
| 参数(Parameters) | 适用于需要动态调整参数的场景,如调试、配置等 | 原理基于服务 |