主要参考:https://book.guyuehome.com/ROS/2.核心概念/2.4_话题/
官方:Understanding-ROS2-Topics — ROS 2 Documentation: Humble documentation
一、ROS2的核心概念 + 前置准备
ROS的安装:【ROS2】机器人操作系统安装到Ubuntu22.04简介(手动
1.工程空间:(workspace): 【ROS2 】核心概念1——工作空间(workspace)-CSDN博客
2. 功能包(Package):解耦的功能代码,包含节点、接口、配置等 【ROS2】 核心概念2——功能包package_ros功能包概念-CSDN博客
3. 节点(Node):机器人的“工作细胞”,可位于不同计算机、不同语言、可独立运行进程 【ROS2】核心概念3——节点(node)
4. 话题(Topic):节点间异步传递数据的“桥梁”,基于发布/订阅模型(如传感器数据流)。
服务(Service):同步的“你问我答”机制,适用于需即时响应的短任务(如开关控制)。
动作(Action):复杂行为的“流程管理”,支持长时间任务、反馈与取消(如导航到目标点)。
通信接口(Interface):数据传递的“标准结构”,定义消息(.msg)、服务(.srv)、动作(.action)格式。
参数(Parameter):机器人系统的“全局字典”,动态键值对配置(如超时时间、IP地址)。
分布式通信(Distributed Communication):跨多计算平台的“任务分配”,支持机器人集群协同工作。
DDS(Data Distribution Service):ROS2底层的“神经网络”,实现实时、可靠的通信中间件
二、 案例1 (收发 文本)
2.1 创建工程空间和功能包
(一定在工程空间的的src下)
#mkdir -p ~/code/ws_zxy/src/ # 创建工程空间 cd ~/code/ws_zxy/src/
ros2 pkg create --build-type ament_python learn_topic_py
创建结果
2.2 新建代码——topic发布和接受节点
文件位置(根据自己情况)
/home/zengxy/code/ROS2/ws_zxy/src/learn_topic_py/learn_topic_py/topic_hello_publishers.py
/home/zengxy/code/ROS2/ws_zxy/src/learn_topic_py/learn_topic_py/topic_hello_subscription.py
topic_hello_subscription.py
创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.pub = self.create_publisher(String, "topic_string", 10)
# 话题名 topic_string发布者随便取,订阅者要接受哪个话题,就写那个,就像订阅不同的公众号的名字一样
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2话题示例-发布“Hello World”话题
"""
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # 字符串消息类型
"""
创建一个发布者节点
"""
class PublisherNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.pub = self.create_publisher(String, "topic_string", 10) # 创建发布者对象(消息类型、话题名、队列长度)
self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
def timer_callback(self): # 创建定时器周期执行的回调函数
msg = String() # 创建一个String类型的消息对象
msg.data = 'Hello World' # 填充消息对象中的消息数据
self.pub.publish(msg) # 发布话题消息
self.get_logger().info('Publishing: "%s"' % msg.data) # 输出日志信息,提示已经完成话题发布
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = PublisherNode("topic_helloworld_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
topic_hello_subscription.py
topic_string 是表示要订阅的收听话题名字,如果不存在,收不到
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TopicHelloSub(Node):
def __init__(self,name):
super().__init__(name) # 传入参数到父节点初始化
# 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.subscription = self.create_subscription(String,"topic_string",self.sub_callback,10)
def sub_callback(self,msg):
self.get_logger().info("I heard: %s" % msg.data) # 输出日志信息,提示订阅收到的话题消息
# topic_hello_sub'
def main(args=None):
rclpy.init(args=args)
node = TopicHelloSub("topic_hello_sub")
rclpy.spin(node)
rclpy.shutdown()
2.3 配置setup.py
tests_require=['pytest'],
entry_points={
'console_scripts': [
'topic_string_pub= learn_topic_py.topic_hello_publishers:main',
'topic_string_sub= learn_topic_py.topic_hello_subscription:main',
],
},
)
2.4 在工程空间编译
在src同级目录下
colcon build
编译结果
(base) zengxy@jame:~/code/ROS2/ws_zxy/src$ colcon build
Starting >>> learn_topic_py
Starting >>> learning_node_py
Finished <<< learn_topic_py [0.55s]
Finished <<< learning_node_py [0.54s]
Summary: 2 packages finished [0.67s]
2.5 加载工程空间到系统环境
每次终端都需要,不然找不到包
source install/setup.bash # 临时生效
# source ~/code/ROS2/ws_zxy/install/setup.bash
2.6 运行节点
ros2 run <功能包> <节点名>
ros2 run learn_topic_py topic_string_pub
结果
(base) zengxy@jame:~/code/ROS2/ws_zxy/src$ ros2 run learn_topic_py topic_string_pub
[INFO] [1746758417.638440414] [topic_hello_publishers]: publishing: hello topic
[INFO] [1746758419.622263074] [topic_hello_publishers]: publishing: hello topic
2.6.1 另一个终端运行接收节点
source install/setup.bash # 临时生效
# source ~/code/ROS2/ws_zxy/install/setup.bash
运行
ros2 run learn_topic_py topic_string_sub
2.6.2 结果
三、思考
定时器的作用
self.create_timer
的主要功能是 周期性触发回调函数(如 timer_callback
),从而实现以下场景:
- 周期性发布消息:例如每秒发布一次传感器数据、状态更新等
- 自动化流程:无需外部事件触发,节点自主运行并定期发送消息
- 简化代码结构:将发布逻辑封装在定时器回调中,避免手动循环或阻塞操
替代方案对比
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
定时器 | 周期性任务(如传感器数据) | 自动化、代码简洁 | 无法灵活响应外部事件 |
事件驱动 | 按需触发(如服务调用、用户输入) | 灵活性高、资源占用低 | 需定义额外触发机制 |
直接调用publish | 单次发布或初始化时发布 | 简单直接 | 无法实现周期性发布 |
关于 sub_callback
中 msg
参数的来源
msg这个形参,可自动读取发布者的信息,名字可以随便取
self.create_subscription(String,"topic_string",self.sub_callback,10)
def sub_callback(self,msg):
self.get_logger().info("I heard: %s" % msg.data) # 输出日志信息,提示
在 ROS2 中,msg
参数由订阅器自动注入,其来源与机制如下:
- 触发机制:当订阅的 话题(如
"strng_sub"
)接收到新消息时,ROS2 底层通信框架(如 DDS)会将消息序列化后传递给订阅者节点。 - 参数注入:
create_subscription
方法通过回调函数(sub_callback
)将接收到的消息 自动解析为消息对象(String
类型),并将其作为参数msg
传入回调函数