← 返回

ROS2 基础 (三) - 节点间通信:参数与动作

ROS2 • 2026年5月31日

1. 参数(Parameters)

在开发中,有很多参数和设置需要后期调整,如果每次都要修改代码并重新编译,效率会非常低。ROS提供了参数这一通信机制,允许我们在运行时动态地设置和获取参数。

参数由键值对组成,每个键值对包含一个名字和一个数值。参数值类型可以是bool、int64、float64、string和它们的数组,以及字节数组byte[]。

打开三个终端A、B,C:

终端A:运行turtlesim节点

# 终端A
ros2 run turtlesim turtlesim_node

终端B:运行键盘控制节点

# 终端B
ros2 run turtlesim turtle_teleop_key

终端C:运行参数常用命令

  • 查看参数列表
ros2 param list --param-type

--param-type选项使参数同时显示其数据类型

param_list
  • 查看参数描述
# ros2 param describe <node_name> <param_name>
ros2 param describe /turtlesim background_b

可看到turtlesim节点的background_b参数的详情:类型为整型,描述为海龟模拟器背景颜色的蓝色分量

param_describe
  • 获取参数值
# ros2 param get <node_name> <param_name>
ros2 param get /turtlesim background_b
ros2 param get /turtlesim background_g
ros2 param get /turtlesim background_r

可得知背景颜色的RGB值为(69,86,255),即蓝色分量为255,绿色分量为86,红色分量为69

param_get
  • 设置参数值
# ros2 param set <node_name> <param_name> <value>
ros2 param set /turtlesim background_r 44
ros2 param set /turtlesim background_g 156
ros2 param set /turtlesim background_b 10

设置背景颜色为RGB(44,156,10),最终背景颜色变为绿色

param_set
  • 保存参数
ros2 param dump /turtlesim > turtlesim_params.yaml
# ros2 param dump /turtlesim 以yaml格式输出turtlesim节点的参数列表,使用">"重定向到turtlesim_params.yaml文件中
  • 加载参数

重启海龟模拟器节点,背景颜色变为默认的蓝色

加载之前保存的参数文件

# ros2 param load <node_name> <parameter_file>
ros2 param load /turtlesim turtlesim_params.yaml

背景颜色变为之前设置的绿色

param_load

也可以在启动节点时直接加载参数文件

# ros2 run <package_name> <executable_name> --ros-args --params-file <parameter_file>
ros2 run turtlesim turtlesim_node --ros-args --params-file turtlesim_params.yaml
param_args

2. 参数编程

  1. 创建功能包
# 创建工作空间
mkdir -p example_param
cd example_param
# 创建ros功能包
ros2 pkg create example_parameters_rclpy \
--build-type ament_python \
--dependencies rclpy \
--destination-directory src \
--node-name parameters_basic \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
  1. 编写节点代码

路径:example_param/src/example_parameters_rclpy/example_parameters_rclpy/parameters_basic.py

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

class ParameterBasicNode(Node):
    """
    创建一个ParametersBasicNode节点,并在初始化时输出一个话
    """
    def __init__(self, name):
        super().__init__(name)
        self.get_logger().info(f"节点已启动:{name}!")
        # 声明参数
        self.declare_parameter('rcl_log_level', 0)
        # 获取参数
        log_level = self.get_parameter("rcl_log_level").value
        # 设置参数
        self.get_logger().set_level(log_level)
        # 定时修改
        self.timer = self.create_timer(0.5, self.timer_callback)

    def timer_callback(self):
        """定时器回调函数"""
        # 轮询获取参数
        log_level = self.get_parameter("rcl_log_level").value
        # 重新设置参数
        self.get_logger().set_level(log_level)
    
        print(f"========================{log_level}=============================")
        self.get_logger().debug("我是DEBUG级别的日志,我被打印出来了!")
        self.get_logger().info("我是INFO级别的日志,我被打印出来了!")
        self.get_logger().warn("我是WARNING级别的日志,我被打印出来了!")
        self.get_logger().error("我是ERROR级别的日志,我被打印出来了!")
        self.get_logger().fatal("我是FATAL级别的日志,我被打印出来了!")

def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = ParameterBasicNode("parameters_basic") # 新建一个节点
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy

节点含有一个定时器,每0.5秒获取参数"rcl_log_level"并设置为当前日志记录器logger的日志级别

其中日志级别由低到高依次为:

  • DEBUG:调试信息,最详细的日志级别,适用于开发和调试阶段,
  • INFO:一般信息,适用于正常运行时的日志记录
  • WARNING:警告信息,表示可能出现问题的情况,但不影响程序的正常运行
  • ERROR:错误信息,表示程序出现了错误,可能会影响程序的正常运行
  • FATAL:致命错误,表示程序出现了严重错误,可能会导致程序崩溃

当日志级别被设置为某个级别时,只有该级别及以上的日志才会被打印出来。例如,当日志级别设置为INFO时,DEBUG级别的日志不会被打印出来。

  1. 编译测试
colcon build --packages-select example_parameters_rclpy
source install/setup.bash
ros2 run example_parameters_rclpy parameters_basic
param_basic
  1. 指定参数

静态指定参数运行:

ros2 run example_parameters_rclpy parameters_basic --ros-args -p rcl_log_level:=10

动态指定参数:

# 先运行节点
ros2 run example_parameters_rclpy parameters_basic
# 在另一个终端动态修改参数值,观察日志输出
ros2 param set /parameters_basic rcl_log_level 10 # DEBUG
ros2 param set /parameters_basic rcl_log_level 20 # INFO
ros2 param set /parameters_basic rcl_log_level 30 # WARNING
ros2 param set /parameters_basic rcl_log_level 40 # ERROR
ros2 param set /parameters_basic rcl_log_level 50 # FATAL
param_dynamic

同时可以在RQt中动态查看和更改参数:

# 新建终端
rqt

在菜单栏选择Plugins->Configuration->Dynamic Reconfigure,在右上角刷新一下,选中parameters_basic节点,可在右侧看到"rcl_log_level"参数,更改该参数后控制台日志输出随之改变

param_rqt

3. 动作(Actions)

动作有三个组成部分:

  • 目标(Goal):客户端发送给服务端的请求,包含要执行的任务信息,服务端针对该目标要有响应
  • 反馈(Feedback):服务端在执行目标过程中发送给客户端的状态更新,包含当前执行进度、结果等信息
  • 结果(Result):服务端完成目标执行后发送给客户端的最终结果,

动作可看做服务+话题的组合:

  • 发送目标服务:客户端向服务端发送目标请求,服务端接收请求并响应,开始执行目标
  • 获取目标状态服务:客户端向服务端查询目标状态,服务端接收请求并响应,返回任务反馈
  • 取消目标服务:客户端向服务端发送取消请求,服务端接收请求并响应,停止执行目标
  • 反馈话题:服务端在执行过程中反馈当前状态
action

开启三个终端A、B、C:
终端A:运行turtlesim节点

ros2 run turtlesim turtlesim_node

终端B:运行键盘控制节点

ros2 run turtlesim turtle_teleop_key

终端提示:

Reading from keyboard
---------------------------
Use arrow keys to move the turtle.
Use g|b|v|c|d|e|r|t keys to rotate to absolute orientations. 'f' to cancel a rotation.
'q' to quit.

箭头方向键可以控制乌龟前进、后退、相对旋转,是通过发布/turtle1/cmd_vel话题来控制乌龟移动的 而F和它周围的九个按键可以调整乌龟的绝对朝向,是通过动作/turtle1/rotate_absolute来控制的

终端C:运行常用action命令

  • 查看动作列表
ros2 action list -t
# -t选项使动作同时显示其接口类型
action_list
  • 查看动作信息
ros2 action info /turtle1/rotate_absolute
action_info
  • 发送目标请求

首先查看接口信息

ros2 interface show turtlesim_msgs/action/RotateAbsolute
action_interface
  • theta为请求字段,表示乌龟要旋转到的绝对朝向,单位为弧度
  • delta为结果字段,记录与起点的相对旋转角度,单位为弧度
  • remaining为反馈字段,表示乌龟与目标朝向之间的剩余角度,单位为弧度
# ros2 action send_goal <action_name> <action_type> <goal_request> [--feedback]
ros2 action send_goal /turtle1/rotate_absolute turtlesim_msgs/action/RotateAbsolute "{theta: 1.5}" --feedback
# 发送一个目标请求,要求乌龟旋转到绝对朝向1.5弧度,并显示反馈信息
action_send_goal

服务端接收请求并响应(Goal accepted)、在执行过程中反馈(Feedback remaining),最终完成目标并返回结果(Result delta)和状态(Goal finished with status SUCCEEDED)

4. 动作编程

(1) 创建动作所需接口

  1. 创建接口功能包
# 创建新工作空间
mkdir -p example_action
cd example_action
# 创建接口功能包
ros2 pkg create robot_control_interfaces \
--build-type ament_cmake \
--destination-directory src \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
  1. 创建动作接口文件
mkdir -p src/robot_control_interfaces/action
touch src/robot_control_interfaces/action/MoveRobot.action
  1. 添加依赖
# package.xml
  <depend>rosidl_default_generators</depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

# CMakeLists.txt
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
    "action/MoveRobot.action"
)
  1. 编写动作接口文件
# MoveRobot.action
# Goal: 要移动的距离
float32 distance
---
# Result: 最终的位置
float32 pose
---
# Feedback: 中间反馈的位置和状态
float32 pose
uint32 status
uint32 STATUS_MOVING = 3
uint32 STATUS_STOP = 4
  1. 编译接口功能包
colcon build --packages-select robot_control_interfaces

(2) 创建动作节点

  1. 创建功能包
# 工作空间 example_action 下
ros2 pkg create example_action_rclpy \
--build-type ament_python \
--dependencies rclpy robot_control_interfaces \
--destination-directory src \
--node-name action_robot \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"
# 再另外创建节点文件action_control.py
touch src/example_action_rclpy/example_action_rclpy/action_control.py
# 创建类文件robot.py
touch src/example_action_rclpy/example_action_rclpy/robot.py

声明入口

# setup.py
entry_points={
    'console_scripts': [
        'action_robot = example_action_rclpy.action_robot:main',
        'action_control = example_action_rclpy.action_control:main'
    ],
},

2. 编写节点代码

  • 类文件robot.py
from robot_control_interfaces.action import MoveRobot
import math
class Robot():
    """机器人类,模拟一个机器人"""
    def __init__(self) -> None:
        self.current_pose_ = 0.0
        self.target_pose_ = 0.0
        self.move_distance_ = 0.0
        self.status_ = MoveRobot.Feedback.STATUS_STOP

    def get_status(self):
        """获取状态"""
        return self.status_
     
    def get_current_pose(self):
        """获取当前位置"""
        return self.current_pose_
    
    def close_goal(self):
        """是否已接近目标"""
        return math.fabs(self.target_pose_ - self.current_pose_) < 0.01 # 距离绝对值是否小于0.01
    
    def stop_move(self):
        """停止移动"""
        self.status_ = MoveRobot.Feedback.STATUS_STOP

    def move_step(self):
        """移动一小步"""
        direct = self.move_distance_ / math.fabs(self.move_distance_) # 移动方向
        step = direct * math.fabs(self.target_pose_ - self.current_pose_) * 0.1 # 单步移动距离
        self.current_pose_ += step # 移动一步
        print(f"移动了:{step}当前位置:{self.current_pose_}")
        return self.current_pose_
    
    def set_goal(self, distance):
        """设置目标"""
        self.move_distance_ = distance # 移动距离
        self.target_pose_ += distance # 更新目标位置
        if self.close_goal(): # 若已接近目标,不更新
            self.stop_move()
            return False
        # 未接近目标,设置移动
        self.status_ = MoveRobot.Feedback.STATUS_MOVING # 更新状态为移动
        return True

Robot类模拟了一个机器人及其移动功能,通过在其它节点中调用Robot来实现对机器人的控制

  • 服务端action_robot.py
#!/usr/bin/env python3
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from example_action_rclpy.robot import Robot

class ActionRobot(Node):
    "机器人端Action服务 "
    def __init__(self, name):
        super().__init__(name)
        self.get_logger().info(f"节点已启动:{name}!")

        self.robot_ = Robot() # 创建一个Robot对象

        self.action_server_ = ActionServer(
            node=self,
            action_type=MoveRobot,
            action_name='move_robot',
            execute_callback=self.execute_callback
        ) # 创建一个ActionServer,指定节点、接口类型、动作名称和执行回调函数
    def execute_callback(self, goal_handle: ServerGoalHandle):
        "处理接收到的Goal的回调函数"
        self.get_logger().info("执行移动机器人")
        feedback_msg = MoveRobot.Feedback()
        self.robot_.set_goal(goal_handle.request.distance) # 设置Robot目标为请求目标的distance
        while rclpy.ok() and not self.robot_.close_goal(): # 当系统正常且任务未完成(未接近目的地)
            # 移动
            self.robot_.move_step()
            # 反馈
            feedback_msg.pose = self.robot_.get_current_pose()
            feedback_msg.status = self.robot_.get_status()
            goal_handle.publish_feedback(feedback_msg)
            # 检查是否有取消请求
            if goal_handle.is_cancel_requested:
                result = MoveRobot.Result()
                result.pose = self.robot_.get_current_pose()
                return result
            time.sleep(0.5)
        # 任务完成,设置状态为完成
        goal_handle.succeed()
        # 返回结果
        result = MoveRobot.Result()
        result.pose = self.robot_.get_current_pose()
        return result

def main(args=None):
    "主函数"
    rclpy.init(args=args)
    action_robot = ActionRobot("action_robot")
    rclpy.spin(action_robot)
    rclpy.shutdown()

服务端节点ActionRobot创建了一个ActionServer,监听/move_robot动作。

当接收到目标请求时,调用execute_callback函数处理请求,在该函数中通过调用Robot类的方法来移动机器人,并在移动过程中不断反馈当前状态和位置,直到任务完成或收到取消请求,最终返回结果(当前位置)。

  • 客户端action_control.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入接口
from robot_control_interfaces.action import MoveRobot

class ActionControl(Node):
    "Action客户端"
    def __init__(self, name):
        super().__init__(name)
        self.get_logger().info(f"节点已启动:{name}!")
        self.action_client_ = ActionClient(
            node=self,
            action_type=MoveRobot,
            action_name='move_robot'
        ) # 创建一个ActionClient,指定节点、接口类型和动作名称
        self.send_goal_timer_ = self.create_timer(1, self.send_goal) # 定时器:1秒后执行

    def send_goal(self):
        """发送目标"""
        self.send_goal_timer_.cancel() # 取消定时器:只发送一次目标
        goal_msg = MoveRobot.Goal() # 构建Goal请求
        goal_msg.distance = 5.0 # 移动5米
        self.action_client_.wait_for_server() # 等待服务器上线
        # 发送请求
        self._send_goal_future = self.action_client_.send_goal_async(
            goal=goal_msg,
            feedback_callback=self.feedback_callback # 设置反馈的回调函数
        )
        self._send_goal_future.add_done_callback(self.goal_response_callback) # 设置动作请求的回调函数

    def goal_response_callback(self, future):
        """处理请求响应的回调函数"""
        goal_handle = future.result() # send_goal的结果
        if not goal_handle.accepted: # 若服务端拒绝请求
            self.get_logger().info('Goal rejected')
            return
        # 服务端接受请求
        self.get_logger().info('Goal accepted')
        self._get_result_future = goal_handle.get_result_async() # 从'请求'的结果里获取'执行'的task
        self._get_result_future.add_done_callback(self.get_result_callback) # 任务执行完成后回调get_result_callback

    def get_result_callback(self, future):
        """获取动作执行结果的回调函数"""
        result = future.result().result
        self.get_logger().info(f"Result: {result.pose}")

    def feedback_callback(self, feedback_msg):
        """获取执行过程中的反馈的回调函数"""
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Received feedback: {feedback.pose}')

def main(args=None):
    "主函数"
    rclpy.init(args=args)
    action_robot = ActionControl("action_control")
    rclpy.spin(action_robot)
    rclpy.shutdown()

客户端节点ActionControl创建了一个ActionClient,连接/move_robot动作。在send_goal函数中构建一个目标请求,等待服务器上线后发送请求,并设置反馈和结果的回调函数。

过程中有三个回调函数:

  • goal_response_callback处理服务端对目标请求的响应
  • feedback_callback处理服务端在执行过程中发送的反馈信息
  • get_result_callback处理服务端完成目标执行后的结果

有两次Future回调:一次是发送目标请求后的回调,一次是获取结果后的回调

(3) 编译测试

编译

colcon build

新建两个终端,分别运行服务端和客户端节点

# 终端A:运行服务端节点
source install/setup.bash
ros2 run example_action_rclpy action_robot
# 终端B:运行客户端节点
source install/setup.bash
ros2 run example_action_rclpy action_control

服务端接收到请求后先响应请求(Goal accepted),然后开始执行目标,在执行过程中不断反馈当前状态

action_run1

最终完成目标并返回结果(Result)

action_run2

节点间通信方式总结

通信方式 适用场景 特点
话题(Topics) 适用于实时性强的场景,如传感器数据等 单向通信
服务(Services) 适用于强调服务特性和反馈的场景 双向通信
动作(Actions) 适用于需要实时反馈长时间执行的任务,如导航、路径规划等 原理基于话题和服务
参数(Parameters) 适用于需要动态调整参数的场景,如调试、配置等 原理基于服务