【ROS2】核心概念4——话题(node)

发布于:2025-05-10 ⋅ 阅读:(12) ⋅ 点赞:(0)

 主要参考https://book.guyuehome.com/ROS/2.核心概念/2.4_话题/

官方:Understanding-ROS2-Topics — ROS 2 Documentation: Humble documentation

一、ROS2的核心概念 + 前置准备

ROS的安装:【ROS2】机器人操作系统安装到Ubuntu22.04简介(手动

1.工程空间:(workspace): 【ROS2 】核心概念1——工作空间(workspace)-CSDN博客

2. 功能包(Package):解耦的功能代码,包含节点、接口、配置等 【ROS2】 核心概念2——功能包package_ros功能包概念-CSDN博客

3. 节点(Node):机器人的“工作细胞”,可位于不同计算机、不同语言、可独立运行进程 【ROS2】核心概念3——节点(node)

4. 话题(Topic):节点间异步传递数据的“桥梁”,基于发布/订阅模型(如传感器数据流)。

服务(Service):同步的“你问我答”机制,适用于需即时响应的短任务(如开关控制)。

动作(Action):复杂行为的“流程管理”,支持长时间任务、反馈与取消(如导航到目标点)。

通信接口(Interface):数据传递的“标准结构”,定义消息(.msg)、服务(.srv)、动作(.action)格式。

参数(Parameter):机器人系统的“全局字典”,动态键值对配置(如超时时间、IP地址)。

分布式通信(Distributed Communication):跨多计算平台的“任务分配”,支持机器人集群协同工作。

DDS(Data Distribution Service):ROS2底层的“神经网络”,实现实时、可靠的通信中间件

二、 案例1  (收发 文本)

2.1 创建工程空间和功能包

(一定在工程空间的的src下)

#mkdir -p ~/code/ws_zxy/src/ # 创建工程空间 cd ~/code/ws_zxy/src/ 
ros2 pkg create --build-type ament_python learn_topic_py 

创建结果

2.2  新建代码——topic发布和接受节点

文件位置(根据自己情况)

/home/zengxy/code/ROS2/ws_zxy/src/learn_topic_py/learn_topic_py/topic_hello_publishers.py

/home/zengxy/code/ROS2/ws_zxy/src/learn_topic_py/learn_topic_py/topic_hello_subscription.py

topic_hello_subscription.py

创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)

 self.pub = self.create_publisher(String, "topic_string", 10)  

# 话题名 topic_string发布者随便取,订阅者要接受哪个话题,就写那个,就像订阅不同的公众号的名字一样

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: 古月居(www.guyuehome.com)
@说明: ROS2话题示例-发布“Hello World”话题
"""

import rclpy                                     # ROS2 Python接口库
from rclpy.node import Node                      # ROS2 节点类
from std_msgs.msg import String                  # 字符串消息类型

"""
创建一个发布者节点
"""
class PublisherNode(Node):

    def __init__(self, name):
        super().__init__(name)                                    # ROS2节点父类初始化
        self.pub = self.create_publisher(String, "topic_string", 10)   # 创建发布者对象(消息类型、话题名、队列长度)
        self.timer = self.create_timer(0.5, self.timer_callback)  # 创建一个定时器(单位为秒的周期,定时执行的回调函数)

    def timer_callback(self):                                     # 创建定时器周期执行的回调函数
        msg = String()                                            # 创建一个String类型的消息对象
        msg.data = 'Hello World'                                  # 填充消息对象中的消息数据
        self.pub.publish(msg)                                     # 发布话题消息
        self.get_logger().info('Publishing: "%s"' % msg.data)     # 输出日志信息,提示已经完成话题发布

def main(args=None):                                 # ROS2节点主入口main函数
    rclpy.init(args=args)                            # ROS2 Python接口初始化
    node = PublisherNode("topic_helloworld_pub")     # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                 # 循环等待ROS2退出
    node.destroy_node()                              # 销毁节点对象
    rclpy.shutdown()                                 # 关闭ROS2 Python接口

topic_hello_subscription.py

topic_string 是表示要订阅的收听话题名字,如果不存在,收不到

import rclpy 
from rclpy.node import Node
from std_msgs.msg import String

class TopicHelloSub(Node):
    def __init__(self,name):
        super().__init__(name)  # 传入参数到父节点初始化
        # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
        self.subscription = self.create_subscription(String,"topic_string",self.sub_callback,10)

    def sub_callback(self,msg):
        self.get_logger().info("I heard: %s" % msg.data) # 输出日志信息,提示订阅收到的话题消息

# topic_hello_sub'
def main(args=None):
    rclpy.init(args=args)
    node = TopicHelloSub("topic_hello_sub")
    rclpy.spin(node)
    rclpy.shutdown() 

2.3 配置setup.py

    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'topic_string_pub= learn_topic_py.topic_hello_publishers:main',
            'topic_string_sub= learn_topic_py.topic_hello_subscription:main',
        ],
    },
)

2.4 在工程空间编译

在src同级目录下

colcon build

编译结果

(base) zengxy@jame:~/code/ROS2/ws_zxy/src$ colcon build
Starting >>> learn_topic_py
Starting >>> learning_node_py
Finished <<< learn_topic_py [0.55s]                          
Finished <<< learning_node_py [0.54s]

Summary: 2 packages finished [0.67s]

2.5 加载工程空间到系统环境

每次终端都需要,不然找不到包

source install/setup.bash  # 临时生效
# source ~/code/ROS2/ws_zxy/install/setup.bash

2.6 运行节点

ros2   run <功能包>  <节点名>

ros2 run learn_topic_py  topic_string_pub

结果

(base) zengxy@jame:~/code/ROS2/ws_zxy/src$ ros2 run learn_topic_py topic_string_pub

[INFO] [1746758417.638440414] [topic_hello_publishers]: publishing: hello topic

[INFO] [1746758419.622263074] [topic_hello_publishers]: publishing: hello topic

2.6.1 另一个终端运行接收节点

source install/setup.bash  # 临时生效
# source ~/code/ROS2/ws_zxy/install/setup.bash

运行

ros2 run learn_topic_py topic_string_sub

2.6.2 结果

三、思考

定时器的作用

self.create_timer 的主要功能是 ​​周期性触发回调函数​​(如 timer_callback),从而实现以下场景:

  1. ​周期性发布消息​​:例如每秒发布一次传感器数据、状态更新等​
  2. 自动化流程​​:无需外部事件触发,节点自主运行并定期发送消息
  3. 简化代码结构​​:将发布逻辑封装在定时器回调中,避免手动循环或阻塞操

替代方案对比

方案 适用场景 优点 缺点
​定时器​ 周期性任务(如传感器数据) 自动化、代码简洁 无法灵活响应外部事件
​事件驱动​ 按需触发(如服务调用、用户输入) 灵活性高、资源占用低 需定义额外触发机制
​直接调用publish​ 单次发布或初始化时发布 简单直接 无法实现周期性发布

关于 sub_callback 中 msg 参数的来源

msg这个形参,可自动读取发布者的信息,名字可以随便取

self.create_subscription(String,"topic_string",self.sub_callback,10)

    def sub_callback(self,msg):
        self.get_logger().info("I heard: %s" % msg.data) # 输出日志信息,提示

在 ROS2 中,​msg 参数由订阅器自动注入​,其来源与机制如下:

  • ​触发机制​​:当订阅的 ​​话题​​(如 "strng_sub")接收到新消息时,ROS2 底层通信框架(如 DDS)会将消息序列化后传递给订阅者节点。
  • ​参数注入​​:create_subscription 方法通过回调函数(sub_callback)将接收到的消息 ​​自动解析为消息对象​​(String 类型),并将其作为参数 msg 传入回调函数


网站公告

今日签到

点亮在社区的每一天
去签到