← 返回

ROS2 基础 (二) - 节点间通信:话题与服务

ROS2 • 2026年5月22日

1. 话题(topic)

ROS中的话题(topic)是一种发布-订阅(publish-subscribe)通信机制。

话题通信机制可类比为聊天群聊:假如有一个讨论游戏(主题)的群聊,任何人都可以加入这个群聊并发布消息(发布者),同时其它加入这个群聊的人可以接收这些消息(订阅者)。

在ROS中,节点可以通过话题进行通信。一个节点可以发布消息到一个话题,另一个节点可以订阅这个话题来接收消息。这种机制使得不同节点之间能够以松耦合的方式进行通信。

打开三个终端,分别开启一个发布者、一个订阅者和一个GUI工具:

# 订阅者示例节点
ros2 run demo_nodes_py listener
# 发布者示例节点
ros2 run demo_nodes_cpp talker
# GUI工具rqt
rqt

在GUI工具中,将Nodes only改为Nodes/Topics(all),刷新一下,可以看到发布者和订阅者之间的通信关系:

发布者talker发布消息到话题/chatter,订阅者listener订阅了话题/chatter来接收消息。

topic

topic常用命令:

  • 查看所有话题:
ros2 topic list -t
#-t参数(type)可以额外显示话题消息类型
topic_list
  • 实时查看话题消息:
ros2 topic echo /chatter 
# 实时查看话题/chatter的消息内容
topic_echo
  • 查看topic信息:
ros2 topic info /chatter
# 显示话题/chatter的详细信息
topic_info
  • 手动发布消息:
ros2 topic pub /chatter std_msgs/msg/String 'data: "manual publish"'
# 向话题/chatter持续发布消息,消息类型为std_msgs/msg/String,内容为"manual publish"
topic_pub

2. topic编程

  1. 创建一个新的功能包:
ros2 pkg create example_topic_rclpy --build-type ament_python --dependencies rclpy
  1. 在工作空间下创建节点文件:
cd example_topic_rclpy/example_topic_rclpy
touch topic_subscriber.py
touch topic_publisher.py
  1. 编写发布者节点:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class NodePublisher(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("大家好,我是%s!" % name)
        self.command_publisher_ = self.create_publisher(msg_type=String,topic="command", qos_profile=10) # 创建发布者
        self.timer = self.create_timer(0.5, self.timer_callback) # 创建定时器
    def timer_callback(self):
        """
        定时器回调函数
        """
        msg = String()
        msg.data = 'backup'
        self.command_publisher_.publish(msg) # 调用publisher的pulish方法发布数据
        self.get_logger().info(f'发布了指令:{msg.data}') #打印一下发布的数据

def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = NodePublisher("topic_publisher") # 新建一个节点
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy

节点初始化时创建了一个publisher发布者,发布的话题名称为command,消息类型为String,qos_profile设置为10

qos_profile参数指定了发布者的质量服务(Quality of Service)配置。只输入一个数值时覆盖默认qos配置的Depth参数,该参数类似于队列长度,若一次性来了100条消息,而qos_profile设置为10,那么只会保留最新的10条消息。

Qos学习:https://mp.weixin.qq.com/s/J63fO4c_QIseLGQd5W2fAw

随后创建了一个定时器,每隔0.5秒调用一次timer_callback函数,在该函数中创建一个String类型的消息,设置消息内容为backup,并调用publisher的publish方法发布该消息。

  1. 编写订阅者节点:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class NodeSubscriber(Node):
    def __init__(self, name):
        super().__init__(name)
        self.get_logger().info("大家好,我是%s!" % name)
        self.command_subscribe_ = self.create_subscription(msg_type=String,topic="command",callback=self.command_callback,qos_profile=10)

    def command_callback(self, msg):
        speed = 0.0
        if msg.data=="backup":
            speed = -0.2
            self.get_logger().info(f'收到[{msg.data}]命令,发送速度{speed}')
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = NodeSubscriber("topic_subscriber") # 新建一个节点
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy

节点初始化时创建了一个subscription订阅者,订阅的话题名称为command,消息类型为String,收到消息后调用self.command_callback(msg)。

在回调函数command_callback中,根据收到的消息内容进行处理,如果收到的消息内容是backup,则设置速度为-0.2,并打印日志。

  1. 编译运行:
# setup.py
entry_points={
    'console_scripts': [
        "topic_publisher = example_topic_rclpy.topic_publisher:main",
        "topic_subscriber = example_topic_rclpy.topic_subscriber:main"
    ],
},
# 路径example_topic_rclpy下
colcon build

新建两个终端分别运行发布者和订阅者:

# 运行发布者,路径example_topic_rclpy下
source install/setup.bash
ros2 run example_topic_rclpy topic_publisher
# 运行订阅者,路径example_topic_rclpy下
source install/setup.bash
ros2 run example_topic_rclpy topic_subscriber

在发布者终端可以看到每隔0.5秒发布了一条backup命令,在订阅者终端可以看到收到backup命令后发送了速度-0.2的日志输出。

topic_run

3. 服务(service)

服务分为客户端和服务端,服务-客户端模型也是请求-相应模型。

与话题比较:话题是没有返回的,适用于单向或大量的数据传递。而服务是双向的,客户端发送请求,服务端响应请求。话题可以是多对多的,而服务是一对多的(一个服务端对应多个客户端),同一个服务只能由一个节点提供。

新建两个终端,终端A运行示例服务端:

ros2 run examples_rclpy_minimal_service service

终端B测试service常用命令:

  • 查看所有服务:
ros2 service list -t
service_list
  • 手动调用服务:
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 10}"
# 调用服务/add_two_ints,服务类型为example_interfaces/srv/AddTwoInts,传递的参数为a=5和b=10

得到结果:sum=15

service_call
  • 查看服务接口类型:
ros2 service type /add_two_ints
# 查看服务/add_two_ints的接口类型
service_type
  • 查找使用某个服务接口的服务:
ros2 service find example_interfaces/srv/AddTwoInts
# 查找使用example_interfaces/srv/AddTwoInts接口的服务
service_find

4. service编程

  1. 创建一个新的功能包:
ros2 pkg create example_service_rclpy --build-type ament_python --dependencies rclpy example_interfaces --node-name service_server

除rclpy外,还依赖了example_interfaces,因为我们要使用其中定义的服务接口。

node-name参数指定了创建的节点名称为service_server,setup.py中会自动添加一个console_scripts入口,指向自动创建的service_server.py的main函数。

由于node-name参数只能指定一个节点名称,所以需要手动创建service_client.py并修改setup.py。

  1. 编写服务端节点service_server.py:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class ServiceServer(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" % name)
        self.add_ints_server_ = self.create_service(srv_type=AddTwoInts, srv_name="add_two_ints_srv", callback=self.handle_add_two_ints) # 新建服务
    
    def handle_add_two_ints(self, request, response):
        """回调函数,实现两数相加返回结果"""
        self.get_logger().info(f"收到请求,计算{request.a} + {request.b}")
        response.sum = request.a + request.b
        return response
  
def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = ServiceServer("service_server") # 新建一个节点
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy

if __name__ == '__main__':
    # if __name__ == '__main__'让脚本可手动执行:python3 service_server.py
    main()

服务端节点创建了一个服务,服务接口类型为AddTwoInts,服务名称为add_two_ints_srv,收到请求后调用handle_add_two_ints函数处理请求,在该函数中计算两数之和并返回结果。

  1. 编写客户端节点service_client.py:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class ServiceClient(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" % name)
        self.client_ = self.create_client(srv_type=AddTwoInts,srv_name="add_two_ints_srv") # 创建客户端
  
    def result_callback_(self, result_future):
        """回调函数,先获取请求结果result_future.result(),然后打印response.sum"""
        response = result_future.result()
        self.get_logger().info(f"收到返回结果:{response.sum}")

    def send_request(self, a, b):
        while rclpy.ok() and self.client_.wait_for_service(1)==False:
            self.get_logger().info(f"等待服务端上线....")
    
        request = AddTwoInts.Request()
        request.a = a
        request.b = b
        self.client_.call_async(request).add_done_callback(self.result_callback_) # 异步请求,任务完成时回调self.result_callback_(result)


def main(args=None):
    rclpy.init(args=args) # 初始化rclpy
    node = ServiceClient("service_client") # 新建一个节点
    node.send_request(3,4)
    rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown() # 关闭rclpy


if __name__ == '__main__':
    main()

客户端节点创建了一个客户端,服务接口类型为AddTwoInts,服务名称为add_two_ints_srv。

send_request函数发送请求,先等待服务端上线,然后创建请求对象并设置参数a和b,最后调用call_async方法异步发送请求,并在任务完成时回调result_callback_函数处理结果。

在主函数中调用send_request(3,4)发送一次请求,计算3+4的结果。

  1. 编译运行:
# 路径example_service_rclpy下
colcon build

新建两个终端分别运行服务端和客户端:

# 运行服务端,路径example_service_rclpy下
source install/setup.bash
ros2 run example_service_rclpy service_server
# 运行客户端,路径example_service_rclpy下
source install/setup.bash
ros2 run example_service_rclpy service_client

在服务端终端可以看到收到请求并计算了3+4的日志输出,在客户端终端可以看到收到返回结果7的日志输出。

service_run

5. 接口(interface)

接口是一种规范:接口定义了数据结构和通信协议,使得不同节点之间能够正确地理解和处理消息。之前我们就使用了example_interfaces/srv/AddTwoInts作为服务接口

接口有多种可使用的基础数据类型:

# 基础数据类型,其后加上[]表示数组类型,如int32[]表示int32类型的数组
bool
byte
char
float32,float64
int8,uint8
int16,uint16
int32,uint32
int64,uint64
string

此外可在已有的接口类型上进行包含,成为包装类型,如std_msgs/msg/String接口中包含了一个string类型的成员data。

uint32 id
string image_name
sensor_msgs/Image

也可以进行类型扩展,类似于C语言的struct继承:

如sensor_msgs/msg/Image接口包含了一个Header接口,Header接口又包含了一个Time接口,Time接口又包含了一个int32类型的成员sec(秒)和uint32成员nanosec(纳秒)。

ros2 interface show sensor_msgs/msg/Image
# 查看接口sensor_msgs/msg/Image的定义

接口按通信类型分类可分为三种(参数类型没有接口):

  • 话题接口:格式为 xxx.msg

之前我们使用了std_msgs/msg/String作为话题消息类型,其中std_msgs是ROS预置的消息包,msg表示这是一个消息接口,String是消息类型。

string data
  • 服务接口:格式为 xxx.srv

之前我们使用了example_interfaces/srv/AddTwoInts作为服务接口,其中example_interfaces是ROS预置的接口包,srv表示这是一个服务接口,AddTwoInts是服务类型。

int64 a
int64 b
---
int64 sum
  • 动作接口:格式为 xxx.action

动作接口包含三个部分:Goal(目标)、Result(结果)和Feedback(反馈)。例如,ROS预置的接口包action_msgs中定义了一个接口MoveRobot.action:

# Goal
float64 x
float64 y
---
# Result
bool success
---
# Feedback
float64 distance_to_goal

常用接口命令:

  • 查看接口列表:
ros2 interface list

结果如下:

Messages:
    action_msgs/msg/GoalInfo
    action_msgs/msg/GoalStatus
    action_msgs/msg/GoalStatusArray
    ...
Services:
    action_msgs/srv/CancelGoal
    composition_interfaces/srv/ListNodes
    composition_interfaces/srv/LoadNode
    ...
Actions:
    example_interfaces/action/Fibonacci
    tf2_msgs/action/LookupTransform
    turtlesim_msgs/action/RotateAbsolute
    ...
  • 查看接口定义:
ros2 interface show std_msgs/msg/String
# 查看接口std_msgs/msg/String的定义

结果如下:

interface_show

6. 接口编程

  1. 创建一个新的功能包:
ros2 pkg create example_ros2_interfaces --build-type ament_cmake --dependencies rosidl_default_generators geometry_msgs

编译工具改为ament_cmake,接口目前只能在CMake包中定义。然而,在CMake包中有Python库和节点是可能的(使用ament_cmake_python),所以可以在一个包中定义接口和Python节点。

依赖添加rosidl_default_generators用于接口生成,依赖添加geometry_msgs因为我们要在接口中使用其中定义的消息类型。

  1. 在工作空间下创建一个msg文件夹存放话题接口,一个srv文件夹存放服务接口:

添加接口文件:

  • 服务接口 srv/MoveRobot.srv
# 前进后退的距离
float32 distance
---
# 当前的位置
float32 pose
  • 话题接口,采用基础类型 msg/RobotStatus.msg
uint32 STATUS_MOVEING = 1
uint32 STATUS_STOP = 2
uint32 status
float32 pose
  • 话题接口,混合包装类型 msg/RobotPose.msg
uint32 STATUS_MOVEING = 1
uint32 STATUS_STOP = 2
uint32 status
geometry_msgs/Pose pose

使得最终文件结构如下:

.
├── CMakeLists.txt
├── msg
│ ├── RobotPose.msg
│ └── RobotStatus.msg
├── package.xml
└── srv
  └── MoveRobot.srv
  1. 在CMakeLists.txt中添加接口生成配置:
find_package(rosidl_default_generators REQUIRED)
find_package(geometry_msgs REQUIRED)
# 添加下面的内容
rosidl_generate_interfaces(${PROJECT_NAME}
    "msg/RobotPose.msg"
    "msg/RobotStatus.msg"
    "srv/MoveRobot.srv"
    DEPENDENCIES geometry_msgs
)
  1. 修改 package.xml
<buildtool_depend>ament_cmake</buildtool_depend>

<depend>rosidl_default_generators</depend>
<depend>geometry_msgs</depend>

<member_of_group>rosidl_interface_packages</member_of_group> # 添加这一行

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
  1. 编译接口:
colcon build

ROS会根据接口定义自动生成C++的头文件和Python的模块,

install/example_ros2_interfaces/include 下应可以看到C++的头文件

install/example_ros2_interfaces/lib 下应可以看到Python的模块

  1. 在工作空间创建一个新的功能包来测试接口:
ros2 pkg create example_interfaces_rclpy \
--build-type ament_python \
--dependencies rclpy example_ros2_interfaces \
--node-name example_interfaces_robot \
--maintainer-name "internetsb" \
--maintainer-email "internetsb@foxmail.com" \
--license "MIT"

touch example_interfaces_rclpy/example_interfaces_rclpy/example_interfaces_control.py # 另外创建一个python文件来编写控制节点

上述指令创建了功能包example_interfaces_rclpy,内含两个节点:机器人节点example_interfaces_robot.py和控制节点example_interfaces_control.py

依赖添加了上述步骤创建的example_ros2_interfaces包,因为我们要在节点中使用其中定义的接口。

目的目录指定为src。

maintainer-name、maintainer-email和license参数是可选的元信息。

# setup.py
entry_points={
    'console_scripts': [
        "example_interfaces_robot = example_interfaces_rclpy.example_interfaces_robot:main",
        "example_interfaces_control = example_interfaces_rclpy.example_interfaces_control:main"
    ],
},
  1. 编写节点代码:
# example_interfaces_robot.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import math
from time import sleep

# 导入接口
from example_ros2_interfaces.msg import RobotStatus
from example_ros2_interfaces.srv import MoveRobot

class Robot():
    """机器人类,模拟直线运动"""
    def __init__(self) -> None:
        self.current_pose_ = 0.0 # 当前位置
        self.target_pose_ = 0.0 # 目标位置
        self.status_ = RobotStatus.STATUS_STOP # 当前状态

    def get_status(self):
        """返回当前状态"""
        return self.status_
  
    def get_current_pose(self):
        """返回当前位置"""
        return self.current_pose_
  
    def move_distance(self, distance):
        """向前或向后移动distance距离"""
        self.status_ = RobotStatus.STATUS_MOVEING # 更新状态为移动
        self.target_pose_ += distance # 更新目标位置
        while math.fabs(self.target_pose_ - self.current_pose_) > 0.01: # 当距离的绝对值>0.01
            step = distance / math.fabs(distance) * math.fabs(self.target_pose_ - self.current_pose_) * 0.1 # 计算一步的方向和移动距离
            self.current_pose_ += step # 移动一步
            print(f"移动了:{step}当前位置:{self.current_pose_}")
            sleep(0.5)
        self.status_ = RobotStatus.STATUS_STOP # 更新状态为停止
        return self.current_pose_

  
class ExampleInterfacesRobot(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" %name)
        self.robot = Robot()
        self.move_robot_server_ = self.create_service(MoveRobot,"move_robot",self.handle_move_robot) # 创建服务
        self.robot_status_publisher_ = self.create_publisher(RobotStatus, "robot_status", 10) # 创建publisher向robot_status主题发布RobotStatus
        self.publisher_timer_ = self.create_timer(0.5, self.publisher_timer_callback) # 每0.5秒计时器回调

    def publisher_timer_callback(self):
        """定时器回调函数,向robot_status主题发布RobotStatus"""
        msg = RobotStatus() # 构造消息
        msg.status = self.robot.get_status()
        msg.pose = self.robot.get_current_pose()

        self.robot_status_publisher_.publish(msg) # 发布消息
        self.get_logger().info(f'发布了当前的状态:{msg.status} 位置:{msg.pose}')

    def handle_move_robot(self, request, response):
        """服务处理函数,将robot移动distance距离,然后返回当前位置"""
        self.robot.move_distance(request.distance)
        response.pose = self.robot.get_current_pose()
        return response


def main(args=None):
    rclpy.init(args=args)# 初始化rclpy
    node=ExampleInterfacesRobot("example_interfaces_robot") #新建一个节点
    rclpy.spin(node)#保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown()# 关闭rclpy

if __name__ == '__main__':
    main()

该节点创建了一个Robot对象来模拟机器人的运动,创建了一个服务move_robot来接收移动请求,并创建了一个定时器每0.5秒发布一次机器人状态到robot_status话题。

# example_interfaces_control.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
# 导入接口
from example_ros2_interfaces.msg import RobotStatus
from example_ros2_interfaces.srv import MoveRobot

class ExampleInterfacesControl(Node):
    def __init__(self,name):
        super().__init__(name)
        self.get_logger().info("节点已启动:%s!" %name)
        self.client_ = self.create_client(MoveRobot, "move_robot") # 创建服务客户端
        self.robot_status_subscribe_ = self.create_subscription(RobotStatus, "robot_status", self.robot_status_callback, 10) # 创建订阅者

    def robot_status_callback(self,msg):
        """订阅回调函数,输出位置和状态日志"""
        self.get_logger().info(f"收到状态数据位置:{msg.pose} 状态:{msg.status}")

    def move_result_callback_(self, result_future):
        """服务完成的回调函数,输出返回值pose"""
        response = result_future.result()
        self.get_logger().info(f"收到返回结果:{response.pose}")

    def move_robot(self,distance):
        """移动机器人,向服务move_robot发起请求"""
        while rclpy.ok() and self.client_.wait_for_service(1) == False:
            self.get_logger().info("等待服务上线.....")
        request = MoveRobot.Request()
        request.distance = distance
        self.get_logger().info(f"请求服务让机器人移动{distance}")
        self.client_.call_async(request).add_done_callback(self.move_result_callback_) # 发送异步请求,完成时调用move_result_callback_回调函数


def main(args=None):
    rclpy.init(args=args)#初始化rclpy
    node=ExampleInterfacesControl("example_interfaces_control") # 新建一个节点
    node.move_robot(5.0) # 移动5
    rclpy.spin(node)#保持节点运行,检测是否收到退出指令(Ctrl+C)
    rclpy.shutdown()# 关闭rclpy

if __name__ == '__main__':
    main()

该节点创建了一个服务客户端来调用move_robot服务,并创建了一个订阅者来订阅机器人状态。

  1. 编译运行:

当前工作空间下应有:

..
├── example_interfaces_rclpy
├── example_ros2_interfaces
├── ...

在工作空间根目录下编译

colcon build --packages-up-to example_interfaces_rclpy

packages-up-to参数指定只编译example_interfaces_rclpy包及其依赖的包,不编译其它无关的包。

新建两个终端分别运行机器人节点和控制节点:

# 工作空间根目录下
source install/setup.bash
ros2 run example_interfaces_rclpy example_interfaces_robot
# 工作空间根目录下
source install/setup.bash
ros2 run example_interfaces_rclpy example_interfaces_control

控制端:

control

机器人端:

robot

可以看到机器人端不能同时移动和发布状态,因为机器人移动时造成了主线程的阻塞,只有完成移动退出后才能继续发布状态。

可以通过多线程或其它通信方式解决这个问题