在 ROS 系统中,通信接口(Interface) 是节点之间传递信息的标准“语言协议”,确保了不同功能节点之间可以正确理解和使用彼此传送的数据内容。我们可以将其理解为“数据结构+格式定义”,贯穿于话题(Topic)、服务(Service)、动作(Action)等通信机制中。
在软件开发中,“接口”是一种连接关系,它规定了数据如何进出,模块如何对接,只有两边的格式和要求一致,系统才能正常“搭伙做事”。
为了让每个 ROS 节点可以用不同的语言编写,比如一个节点用 C++ 控制硬件,另一个节点用 Python 实现上层逻辑,ROS 把通信接口设计成与编程语言无关的格式。
int32
表示 32 位整型
int64
表示 64 位长整型
bool
表示布尔值
还支持数组(如 int32[]
)、嵌套结构体等复合数据类型
这些接口定义写在 .msg
(消息)或 .srv
(服务)文件中。
在 编译时,ROS 会自动生成对应语言(如 C++ 或 Python)里的数据结构文件,供节点直接使用,开发者不需要自己去写解析逻辑。
无论你用什么语言编写节点,只要遵循统一接口,数据就能准确传输、协同工作。
ROS 2 三种通信机制(话题、服务、动作)在接口定义上的格式差异。
在 ROS 2 系统中,通信接口的定义格式因通信机制而异,但都遵循统一的、语言无关的描述方式。
话题(Topic)通信
使用 .msg
文件定义,属于单向异步通信。
只需定义每一帧数据的内容格式,如:
int32 x
int32 y
表示消息中包含两个 32 位整型数据,可用于发送二维坐标等。
服务(Service)通信
使用 .srv
文件定义,属于请求-应答式的同步通信。
定义由请求部分和应答部分组成,中间用 ---
分隔,例如:
int64 a
int64 b
---
int64 sum
客户端发起请求,包含 a
和 b
,服务器处理后返回 sum
。
动作(Action)通信
使用 .action
文件定义,适合描述持续一段时间的过程性任务,如移动、旋转、导航等。
定义包含三部分:
# 目标
bool enable
---
# 结果
bool finish
---
# 反馈
int32 state
目标(Goal):开始任务,如开始转动
结果(Result):任务最终是否完成
反馈(Feedback):周期性返回进度,如当前已转动角度
通信机制 | 接口文件 | 特点 | 数据方向 |
---|---|---|---|
话题 | .msg |
异步、广播 | 单向 |
服务 | .srv |
同步请求 | 客户端 ⇄ 服务端 |
动作 | .action |
可反馈过程 | 客户端 ⇄ 服务端 + 反馈 |
服务接口的定义与使用
在本案例中,我们以一个获取目标位置的服务为例,全面了解 ROS 2 中服务接口的定义方法及实际应用方式。
learning_interface/srv/GetObjectPosition.srv
bool get # 请求:是否获取目标位置
---
int32 x # 响应:目标X坐标
int32 y # 响应:目标Y坐标
bool get
:客户端请求参数,true 表示请求当前目标位置。
---
:分隔请求与响应。
int32 x, y
:服务端反馈的目标坐标。
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from learning_interface.srv import GetObjectPosition # 自定义的服务接口
class objectClient(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.client = self.create_client(GetObjectPosition, 'get_target_position')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.request = GetObjectPosition.Request()
def send_request(self):
self.request.get = True
self.future = self.client.call_async(self.request)
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = objectClient("service_object_client") # 创建ROS2节点对象并进行初始化
node.send_request()
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
except Exception as e:
node.get_logger().info(
'Service call failed %r' % (e,))
else:
node.get_logger().info(
'Result of object position:\n x: %d y: %d' %
(response.x, response.y))
break
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
import numpy as np # Python数值计算库
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
from learning_interface.srv import GetObjectPosition # 自定义的服务接口
lower_red = np.array([0, 90, 128]) # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255]) # 红色的HSV阈值上限
class ImageSubscriber(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(
Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换
self.srv = self.create_service(GetObjectPosition, # 创建服务器对象(接口类型、服务名、服务器回调函数)
'get_target_position',
self.object_position_callback)
self.objectX = 0
self.objectY = 0
def object_detect(self, image):
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型
mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 图像二值化
contours, hierarchy = cv2.findContours(
mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 图像中轮廓检测
for cnt in contours: # 去除一些轮廓面积太小的噪声
if cnt.shape[0] < 150:
continue
(x, y, w, h) = cv2.boundingRect(cnt) # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高
cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)# 将苹果的轮廓勾勒出来
cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,
(0, 255, 0), -1) # 将苹果的图像中心点画出来
self.objectX = int(x+w/2)
self.objectY = int(y+h/2)
cv2.imshow("object", image) # 使用OpenCV显示处理后的图像效果
cv2.waitKey(50)
def listener_callback(self, data):
self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数
image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 将ROS的图像消息转化成OpenCV图像
self.object_detect(image) # 苹果检测
def object_position_callback(self, request, response): # 创建回调函数,执行收到请求后对数据的处理
if request.get == True:
response.x = self.objectX # 目标物体的XY坐标
response.y = self.objectY
self.get_logger().info('Object position\nx: %d y: %d' %
(response.x, response.y)) # 输出日志信息,提示已经反馈
else:
response.x = 0
response.y = 0
self.get_logger().info('Invalid command') # 输出日志信息,提示已经反馈
return response
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = ImageSubscriber("service_object_server") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
这是一个通过图像识别红色目标物体并通过ROS服务机制获取其当前坐标的系统。
服务器端(服务提供方):接收图像并处理,检测红色物体位置,一旦客户端请求,就返回该位置。
客户端(服务调用方):主动发起“请告诉我目标位置”的请求,获取最新坐标。
在 ROS2 中,通信接口(Interface) 是一种标准格式的“数据结构定义”,用于描述节点间如何传递数据。
对于 服务通信(Service) 来说,接口包括两部分:
请求部分(Request):客户端发送给服务端的数据。
响应部分(Response):服务端返回给客户端的结果。
接口使用 .srv
文件定义,例如你定义的:
定义通信接口文件 .srv
bool get
---
int32 x
int32 y
在服务端创建服务对象,绑定回调
self.srv = self.create_service(
GetObjectPosition, # 使用定义好的接口
'get_target_position', # 服务名
self.object_position_callback # 处理函数
)
在客户端创建服务请求并发送
self.client = self.create_client(GetObjectPosition, 'get_target_position')
self.request = GetObjectPosition.Request()
self.request.get = True
self.future = self.client.call_async(self.request)
服务端处理请求并返回响应
def object_position_callback(self, request, response):
if request.get:
response.x = self.objectX
response.y = self.objectY
return response
再次说明,# 文件:learning_interface/srv/GetObjectPosition.srv
bool get
---
int32 x
int32 y
系统会自动生成:
一个 GetObjectPosition.Request
类(含 get
字段)
一个 GetObjectPosition.Response
类(含 x
和 y
字段)
所以,你在代码中哪里写了 .get
、.x
、.y
,就说明用了通信接口定义的字段!
通信接口 .srv
文件是怎么关联上的?
在代码中,首先导入:# 文件:learning_interface/srv/GetObjectPosition.srv
from learning_interface.srv import GetObjectPosition
意思是:你定义的服务接口 GetObjectPosition.srv
,位于 ROS功能包 learning_interface
的 srv/
文件夹里。
你就可以在你项目的工作空间中查找该文件:cd ~/ros2_ws/src/learning_interface/srv
ls
# 应该能看到 GetObjectPosition.srv