Queue消息队列
官方文档:OpenAtom OpenHarmony
基本概念
消息队列又称队列,是一种任务间通信的机制。消息队列接收来自任务或中断的不固定长度消息,并根据不同的接口确定传递的消息是否存放在队列空间中。
任务能够从队列里面读取消息,当队列中的消息为空时,挂起读取任务;当队列中有新消息时,挂起的读取任务被唤醒并处理新消息。任务也能够往队列里写入消息,当队列已经写满消息时,挂起写入任务;当队列中有空闲消息节点时,挂起的写入任务被唤醒并写入消息。
可以通过调整读队列和写队列的超时时间来调整读写接口的阻塞模式,如果将读队列和写队列的超时时间设置为0,就不会挂起任务,接口会直接返回,这就是非阻塞模式。反之,如果将读队列和写队列的超时时间设置为大于0的时间,就会以阻塞模式运行。
消息队列提供了异步处理机制,允许将一个消息放入队列,但不立即处理。同时队列还有缓冲消息的作用,可以使用队列实现任务异步通信,队列具有如下特性:
- 消息以先进先出的方式排队,支持异步读写。
- 读队列和写队列都支持超时机制。
- 每读取一条消息,就会将该消息节点设置为空闲。
- 发送消息类型由通信双方约定,可以允许不同长度(不超过队列的消息节点大小)的消息。
- 一个任务能够从任意一个消息队列接收和发送消息。
- 多个任务能够从同一个消息队列接收和发送消息。
- 创建普通队列时所需的队列空间,由系统自行动态申请内存。
- 创建静态队列时所需的队列空间,由用户传入。这块空间在队列删除之后也由用户去释放。
运行原理
队列控制块
队列会在初始化时给分配一个属于自己的控制块,控制块包含了队列的名称、状态等信息。删除队列时会释放该控制块。
队列控制块数据结构如下:
typedef struct
{
UINT8 *queue; /* 队列消息内存空间的指针 */
UINT8 *queueName /* 队列名称 */
UINT16 queueState; /* 队列状态 */
UINT16 queueLen; /* 队列中消息节点个数,即队列长度 */
UINT16 queueSize; /* 消息节点大小 */
UINT16 queueID; /* 队列ID */
UINT16 queueHead; /* 消息头节点位置(数组下标)*/
UINT16 queueTail; /* 消息尾节点位置(数组下标)*/
UINT16 readWriteableCnt[OS_READWRITE_LEN]; /* 数组下标0的元素表示队列中可读消息数,
数组下标1的元素表示队列中可写消息数 */
LOS_DL_LIST readWriteList[OS_READWRITE_LEN]; /* 读取或写入消息的任务等待链表,
下标0:读取链表,下标1:写入链表 */
LOS_DL_LIST memList; /* 内存块链表 */
} LosQueueCB;
每个队列控制块中都含有队列状态,表示该队列的使用情况:
- OS_QUEUE_UNUSED:队列未被使用。
- OS_QUEUE_INUSED:队列被使用中。
队列运作原理
- 创建队列时,创建队列成功会返回队列ID。
- 在队列控制块中维护着一个消息头节点位置Head和一个消息尾节点位置Tail,用于表示当前队列中消息的存储情况。Head表示队列中被占用的消息节点的起始位置。Tail表示被占用的消息节点的结束位置,也是空闲消息节点的起始位置。队列刚创建时,Head和Tail均指向队列起始位置。
- 写队列时,根据readWriteableCnt[1]判断队列是否可以写入,不能对已满(readWriteableCnt[1]为0)队列进行写操作。写队列支持两种写入方式:向队列尾节点写入,也可以向队列头节点写入。尾节点写入时,根据Tail找到起始空闲消息节点作为数据写入对象,如果Tail已经指向队列尾部则采用回卷方式。头节点写入时,将Head的前一个节点作为数据写入对象,如果Head指向队列起始位置则采用回卷方式。
- 读队列时,根据readWriteableCnt[0]判断队列是否有消息需要读取,对全部空闲(readWriteableCnt[0]为0)队列进行读操作会引起任务挂起。如果队列可以读取消息,则根据Head找到最先写入队列的消息节点进行读取。如果Head已经指向队列尾部则采用回卷方式。
- 删除队列时,根据队列ID找到对应队列,把队列状态置为未使用,把队列控制块置为初始状态,并释放队列所占内存。
图1 队列读写数据操作示意图
上图对读写队列做了示意,图中只画了尾节点写入方式,没有画头节点写入,但是两者是类似的。
BUILD.gn
# Copyright (c) 2022 Hunan OpenValley Digital Industry Development Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//kernel/liteos_m/liteos.gni")
module_name = get_path_info(rebase_path("."), "name")
kernel_module(module_name){
sources = [
"los_message_example.c",
]
include_dirs = [
"//kernel/liteos_m/kernel/include",
]
}
los_message_example.c
/*
* Copyright (c) 2022 Hunan OpenValley Digital Industry Development Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "los_queue.h"
#include "los_task.h"
#include "ohos_run.h"
#define BUFFER_LEN 50
#define USLEEP_1S (1000000)
#define TASK_PRIO_H 9
#define TASK_PRIO_L 10
#define QUEUE_LEN 5
#define QUEUE_MAX_SIZE 50
static UINT32 g_queue;
VOID SendEntry(VOID)
{
UINT32 ret = 0;
CHAR abuf[] = "test message";
UINT32 len = sizeof(abuf);
ret = LOS_QueueWriteCopy(g_queue, abuf, len, 0);
if (ret != LOS_OK) {
printf("send message failure, error: %x\n", ret);
}
}
VOID RecvEntry(VOID)
{
UINT32 ret = 0;
CHAR readBuf[BUFFER_LEN] = {0};
UINT32 readLen = BUFFER_LEN;
// 休眠1s
usleep(USLEEP_1S);
ret = LOS_QueueReadCopy(g_queue, readBuf, &readLen, 0);
if (ret != LOS_OK) {
printf("recv message failure, error: %x\n", ret);
}
printf("recv message: %s\n", readBuf);
ret = LOS_QueueDelete(g_queue);
if (ret != LOS_OK) {
printf("delete the queue failure, error: %x\n", ret);
}
printf("delete the queue success.\n");
}
UINT32 ExampleQueue(VOID)
{
printf("start queue example.\n");
UINT32 ret = 0;
UINT32 task1, task2;
TSK_INIT_PARAM_S initParam = {0};
initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)SendEntry;
initParam.usTaskPrio = TASK_PRIO_L;
initParam.uwStackSize = LOSCFG_BASE_CORE_TSK_DEFAULT_STACK_SIZE;
initParam.pcName = "SendQueue";
LOS_TaskLock();
ret = LOS_TaskCreate(&task1, &initParam);
if (ret != LOS_OK) {
printf("create task1 failed, error: %x\n", ret);
return ret;
}
initParam.pcName = "RecvQueue";
initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)RecvEntry;
initParam.usTaskPrio = TASK_PRIO_H;
ret = LOS_TaskCreate(&task2, &initParam);
if (ret != LOS_OK) {
printf("create task2 failed, error: %x\n", ret);
return ret;
}
ret = LOS_QueueCreate("queue", QUEUE_LEN, &g_queue, 0, QUEUE_MAX_SIZE);
if (ret != LOS_OK) {
printf("create queue failure, error: %x\n", ret);
}
printf("create the queue success.\n");
LOS_TaskUnlock();
return ret;
}
OHOS_APP_RUN(ExampleQueue);
编译并烧录
在源码根目录下使用hb工具对写好的代码进行编译
选择mini级系统
同理 产品选择esp公司下的esp32
选择完毕后在源码根目录下执行hb build -f 进行编译
编译完成后会有如下界面,并且编译后的代码固件位于:out\esp32\esp32
验证结果
按下ESP32开发板上的EN键,即可观察到实验现象:
API参考
LOS_QueueCreate()
UINT32 LOS_QueueCreate(const CHAR *queueName,
UINT16 len,
UINT32 *queueID,
UINT32 flags,
UINT16 maxMsgSize);
描述:
创建消息队列
注意 :不能在中断服务调用该函数
参数:
名字 |
描述 |
queueName |
消息队列名称,暂时未使用 |
len |
消息队列长度,该值的范围:[1,0xffff] |
queueID |
消息队列ID,属输出 |
flags |
队列模式,暂未用,可传0 |
maxMsgSize |
消息的大小,该值范围:[1,0xffff-4] |
LOS_QueueReadCopy()
UINT32 LOS_QueueReadCopy(UINT32 queueID, VOID *bufferAddr, UINT32 *bufferSize, UINT32 timeOut);
描述:
读取消息队列,用于读取指定队列中的数据,并将获取到的数据存储到bufferAddr指定的地址中。 要读取的数据的地址和大小由用户定义。
注意 :不能在中断服务调用该函数
参数:
名字 |
描述 |
queueID |
队列ID |
bufferAddr |
指针,存放读取到的消息 |
bufferSize |
在读取之前维护缓冲区的期望大小,和读取后消息的实际大小 |
timeOut |
超时值,范围:[0,LOS_WAIT_FOREVER],单位tick |
LOS_QueueWriteCopy()
UINT32 LOS_QueueWriteCopy(UINT32 queueID, VOID *bufferAddr, UINT32 bufferSize, UINT32 timeOut);
描述:
发送消息,用于将bufferSize指定的大小和存储在bufferAddr指定地址的数据写入队列中。
注意 :如果参数timeout设置为0,可以从中断服务例程调用
参数:
名字 |
描述 |
queueID |
消息队列ID |
bufferAddr |
要发送的消息 |
bufferSize |
消息大小 |
timeout |
超时值 |
LOS_QueueRead()
UINT32 LOS_QueueRead(UINT32 queueID, VOID *bufferAddr, UINT32 bufferSize, UINT32 timeOut);
描述:
获取消息,用于读取指定队列中的数据地址,并将其存储到由bufferAddr指定的地址中。
注意 :不能在中断服务调用该函数
参数:
名字 |
描述 |
queueID |
消息队列ID |
bufferAddr |
存储获取数据的起始地址。起始地址不能为空。 |
bufferSize |
传入的缓冲区大小,不能为0,范围:[1,0xffffffff] |
timeout |
超时值,范围:[0,LOS_WAIT_FOREVER],单位tick |
返回 0 - 成功,非0 - 失败 |
LOS_QueueWrite()
UINT32 LOS_QueueWrite(UINT32 queueID, VOID *bufferAddr, UINT32 bufferSize, UINT32 timeOut);
描述:
发送消息
参数:
名字 |
描述 |
queueID |
消息队列ID |
bufferAddr |
要发送的消息 |
bufferSize |
消息大小 |
timeout |
超时值 |