Linux-IPC-消息队列

发布于:2025-02-27 ⋅ 阅读:(19) ⋅ 点赞:(0)

Linux IPC 之 消息队列(Message Queue)

在 Linux 中,消息队列(Message Queue) 是一种进程间通信(IPC)机制,允许进程通过一个消息队列在内核空间中交换数据。消息队列是基于 队列 数据结构实现的,具有先进先出(FIFO)特性,即最先发送的消息最先被接收。它提供了一种更加灵活的方式,比管道更适合于处理不同类型和不同大小的消息。

消息队列的特点

  1. 先进先出(FIFO):消息队列是按消息到达的顺序进行处理的,保证了顺序性。
  2. 异步通信:进程可以在不阻塞的情况下发送或接收消息。发送消息的进程不会因为目标进程没有及时接收消息而被阻塞(除非队列已满)。接收消息的进程会阻塞,直到队列中有消息可读(也可以通过设置非阻塞模式,避免阻塞行为)。
  3. 可存储多个消息:一个消息队列中可以存储多个消息,进程可以分批读取消息队列中的消息。
  4. 支持消息优先级:消息队列支持消息优先级,允许根据消息的优先级来决定消息的处理顺序。
  5. 可读可写:并不是只能一端读一端写,不过一般设置一端读一端写(看业务需求)。

POSIX 消息队列

POSIX 消息队列的特性

  1. 基于文件系统:POSIX 消息队列通过文件系统接口进行操作。消息队列有一个 名称,类似文件路径,进程通过文件名访问消息队列。这种设计使得 POSIX 消息队列在进程间通信中更加灵活,支持不同进程间通过路径来通信。
  2. 异步操作:POSIX 消息队列是异步的,允许进程以非阻塞模式发送和接收消息。
  3. 优先级:POSIX 消息队列支持设置消息的优先级,允许进程根据优先级来控制消息的处理顺序。
  4. 文件描述符接口:POSIX 消息队列通过文件描述符进行管理,与常规文件操作(如 open()、read()、write())相似

POSIX 消息队列的 API

POSIX 消息队列的 API 主要通过以下几个函数实现:

  1. mq_open():创建或打开消息队列。
  2. mq_send():发送消息到消息队列。
  3. mq_receive():从消息队列接收消息。
  4. mq_close():关闭消息队列。
  5. mq_unlink():删除消息队列。

POSIX 消息队列的操作流程

1. mq_open()

用于创建或打开一个消息队列。它返回一个消息队列的文件描述符,进程通过该文件描述符进行其他操作。

函数原型

mqd_t mq_open(const char *name, int oflag, ...);
  • ...: 可选参数,只有在某些特定的标志(例如 O_CREAT)时才需要。
  • 通常,读取消息的进程会使用 O_RDONLYO_RDONLY | O_NONBLOCK 来打开消息队列,表示它只需要从队列中读取消息。因此,这种进程的 mq_open() 只需要传递 两个参数:
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
  • namemq_open 使用的共享内存名称,应当是以 / 开头的 POSIX 名称,会在/dev/mqueue/下创建name不要/的设备文件。
  • oflag:打开标志,如 O_CREAT(创建)、O_RDONLY(只读)、O_WRONLY(只写)、 O_NONBLOCK(非阻塞读)(默认为阻塞)等。
  • mode:消息队列的权限,通常设置为 0666。
  • attr:一个结构体,指定消息队列的属性,如最大消息大小、队列的最大消息数等。
    • 如果不指定mq_attr的话,内核将会使用默认值,特别是mq_msgsize会有默认值(一般是8192),所以接收方一定要设置大于等于这个大小的缓存区去接收信号队列的数据。
    • mq_maxmsg的值设置必须大于0,否则会报错(显然最多为0是无意义的)。
struct mq_attr {
    long mq_flags;       // 标志,通常为 0
    long mq_maxmsg;      // 队列中最多消息数
    long mq_msgsize;     // 每个消息的最大字节数
    long mq_curmsgs;     // 当前队列中的消息数
};
2. mq_receive()

从消息队列接收消息。

#include <mqueue.h>

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
  • mqdes:消息队列的文件描述符。
  • msg_ptr:接收消息内容的缓冲区。
  • msg_len:缓冲区的大小。
  • msg_prio:接收的消息优先级。
3. mq_send()

向消息队列发送消息。

#include <mqueue.h>

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
  • mqdes:消息队列的文件描述符。
  • msg_ptr:指向要发送的消息内容的指针。
  • msg_len:消息的长度。
  • msg_prio:消息的优先级,值越大优先级越高,排在消息队列最前
4. mq_close()

关闭消息队列的文件描述符,类似于关闭文件。

#include <mqueue.h>

int mq_close(mqd_t mqdes);
  • mqdes:消息队列的文件描述符。
  • 打开引用计数++
5. mq_unlink()

删除消息队列。这通常是在消息队列不再使用时调用,用于清理资源。

#include <mqueue.h>

int mq_unlink(const char *name);
  • name:要删除的消息队列的名称。

  • mq_unlink 是用于删除 POSIX 消息队列对象的系统调用。它的作用是 从系统中删除消息队列存对象的名称,使得它在 mq_open 中不可见(在/dev/mqueue/目录下将不会有对应的消息队列文件),但不会立即销毁消息队列,直到所有进程关闭该消息队列对象后,系统才会回收它

  • 不会立即删除消息队列的实际数据,如果有进程仍然持有文件描述符或映射该消息队列,则消息队列仍然存在,直到所有引用(文件描述符)都被mq_unlinkmq_close() 释放。

  • namemq_open 使用的共享内存名称,应当是以 / 开头的 POSIX 名称,会在/dev/mqueue/下创建name不要/的设备文件。

  • 可以查看消息队列的信息

    • ls -l /dev/mqueue/<name>
    • stat /dev/mqueue/<name>
  • 强制删除 POSIX 消息队列

    • rm /dev/mqueue/<name>

接口详细说明

oflag
标志 描述
O_RDONLY 只读打开消息队列对象,如果消息队列对象不存在报错
O_WRONLY 只读模式打开消息队列对象,如果消息队列对象不存在报错
O_CREAT 如果消息队列对象不存在,则创建,若对象已存在,仍会成功返回文件描述符
O_EXCL O_CREAT 一起使用,若对象已存在,则返回错误
O_NONBLOCK 打开消息队列时,设置为非阻塞模式。如果消息队列为空,mq_receive()mq_send() 会立即返回,而不会阻塞。
O_RDWR 打开消息队列进行读写操作。,如果消息队列对象不存在报错
mode

mq_open 中的 mode 参数

  • mq_openmode 参数用于指定消息队列对象的访问权限,其语义与 open 系统调用的 mode 参数相同。它仅在 O_CREAT 标志被使用时才生效,否则会被忽略。
  1. mode常见权限
模式(宏) 数值(八进制 描述
S_IRUSR 0400 用户可读
S_IWUSR 0200 用户可写
S_IRGRP 0040 组可读
S_IWGRP 0020 组可写
S_IROTH 0004 其他用户可读
S_IWOTH 0002 其他用户可写
  1. mode 组合方式

可以通过 按位或 (|) 运算符 组合多个权限。例如:

  • 0666(所有用户可读写):
mq_open("/my_queue", O_CREAT | O_RDWR, 0666, NULL);

等价于:

mq_open("/my_queue", O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, NULL);

POSIX 消息队列代码示例

进程 A(写入消息队列)
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>

#define QUEUE_NAME "/my_queue"

int main() {
    mqd_t mq;
    struct mq_attr attr;
    char *msg = "Hello 1 from Process A by message queue !";
    char *msg2 = "Hello 2 from Process A by message queue !";
    unsigned int prio = 1;
    attr.mq_flags = 0;
    attr.mq_maxmsg = 10;
    attr.mq_curmsgs = 0;
    attr.mq_msgsize = strlen(msg) > strlen(msg2) ? strlen(msg) + 1 : strlen(msg2) + 1;

    // 打开或创建消息队列
    // 如果在attr参数填NULL,则系统会给默认值
    mq = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0666, &attr);
    if (mq == (mqd_t) -1) {
        perror("mq_open");
        exit(1);
    }

    // 发送消息,优先级为2
    if (mq_send(mq, msg, strlen(msg) + 1, 2) == -1) {
        perror("mq_send");
        exit(1);
    }
    printf("Message sent: %s\n", msg);
    
    // 发送消息,优先级为7,优先级越大越高,排在消息队列最前
    if (mq_send(mq, msg, strlen(msg2) + 1, 7) == -1) {
        perror("mq_send");
        exit(1);
    }
    printf("Message sent: %s\n", msg2);
    

    // 关闭消息队列
    mq_close(mq);
    // mq_unlink(QUEUE_NAME);
    return 0;
}
进程 2(读取消息队列)
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>

#define QUEUE_NAME "/my_queue"

int main() {
    mqd_t mq;
    char buffer[1024];
    unsigned int prio;
    // 打开消息队列
    mq = mq_open(QUEUE_NAME, O_RDONLY, 0666, NULL);
    if (mq == (mqd_t) -1) {
        perror("mq_open");
        exit(1);
    }
    // 接收消息,并获取消息优先级(数字越大优先级越高)
    if (mq_receive(mq, buffer, sizeof(buffer), &prio) == -1) {
        perror("mq_receive");
        exit(1);
    }

    printf("Message received: %s\n", buffer);
    printf("Message priority: %u\n", prio);

    // 关闭消息队列
    mq_close(mq);
    // mq_unlink(QUEUE_NAME);
    return 0;
}