rknn优化教程(一)

发布于:2025-06-05 ⋅ 阅读:(48) ⋅ 点赞:(0)

1. 前述

OK,铺垫了很久的rknn优化,终于开始写了。为什么要优化呢?当然是我们的使用遇到了瓶颈,要么使用的时候达不到实时帧率,要么就是根本没有将硬件利用起来,那么这篇博客就是围绕两个核心的问题来进行说明。那么博客的内容主要就是如下的一些部分:

  • 如何解决rknn_model_zoo给的demo中无法达到实时帧率的问题
  • 如何能将rk3588上的硬件资源全部利用起来
  • 如何使用一些成熟的库来替代和优化rknn_model_zoo的demo中给出的部分函数
  • 如何使用xmake构建这个优化工程

2. 优化思想

针对前面提出的4个部分,博客将逐一给出解决方案。

注意,博客将以detect进行说明,至于其他的比如segmentpose等等,都是一通百通了。

2.1 实时帧率

如果正常的使用yolo11或者yolov8nano模型在rk3588上跑,大致能跑15~20fps就差不多了。但如果是small模型呢?如果标签数量很多呢?可能连15fps都达不到。当然这些都是以rknn_model_zoo中的demo来说的。

这里不得不吐槽一下rknn_model_zoo的代码水平,我是觉得那个c/c++写的不伦不类,用c的思路写c++不说了,而且各种东西穿插使用,看的特别割裂。而且还有一些c/c++入门的新手写代码的一些非常低级的用法,这个东西不好用,写这个库和demo的人至少背80%的锅。

一个很简单的东西,比如我现在有两个float类型的数组A和B,如果要将A中的元素全部拷贝到B中去,这个里面的写法是:

// 这里的count可能是 640*640*3这样的大小
for (int index = 0; index < count; ++index)
{
    B[index] = A[index];
}

我就不说优化的思路,或者说经常写c/c++的人应该怎么写了,大家也可以给出自己的想法。

为了解决实时帧率:既然单线程只能跑15~20fps,那我是不是可以开2个线程?是不是可以开3个?甚至9个?

OK,那我们已经找到了一个很有用的解决方法,就是使用多线程来解决实时帧率的问题,那么我们解决了实时帧率的问题,但是又带来了新的问题:

  • 如何在多线程的情况下保证帧的顺序?

2.2 多线程处理

多线程可以帮助我们达到实时帧率的处理,但是我们需要一个优雅的方法来处理帧的顺序问题。

比如我们有一个连续的帧1 2 3,然后放入到三个线程中进行处理:

在这里插入图片描述

当然,如果按照顺序输出 1 2 3的结果肯定是好的,但是我们根本无法保证这个输出的顺序,而且输出的顺序 3 2 1这个的概率不低,那我们就还得给输出进行排序,这个就是多线程下必须解决的问题。

那我们有几个思路:

  • 对每一个数据帧都绑定一个数值,然后输出的时候进行排序?
  • 每次都是执行线程数量的数据帧,每次输入输出,然后再进行下一批数量的输入输出,按照图示,就是每次送3帧数据,然后都处理完了,再送入三帧。
  • 队列处理?

那我们就对这三种思路分别分析一下:

2.2.1 排序

纯粹的排序听起来是最简单的方法,非常容易,每次调用一下std::sort似乎就可以了,但是没办法解决几个问题:

  • 每次std::sort非常耗费时间和资源
  • 数据连续处理的时候,针对图示的线程数量,可能存在第6帧的结果比第2帧的结果还要先得到,虽然概率极小,但是也是存在的,那怎么排序呢?怎么确认你得到了序号最小的那一个数据?

所以这个排序可以实现,但不稳定、不好用

2.2.2 批量处理

这个很容易解决,只要我们将三个线程都是用std::future就可以了,每次等待三帧的处理,然后三个结果那也很好处理序号的问题,代码写起来那也是非常easy了,但是有一个非常严重的问题:

  • 每次都必须延迟3帧的时间

虽然看起来处理的速度很快,但是要导致3帧的延迟,这对于实时系统来说,可能无法接受,尤其是:

假设你现在传入的是1fps的视频进行处理,就必须延迟3s左右才能获得结果,这就没法玩了……

2.2.3 队列

那就必须要考虑到我们在处理时序问题时候经常使用到的方法:队列

OK,知道要使用队列了,但是我怎么处理队列呢?还是没法避免排序的问题啊?

这里我们首先要想到,我线程的数量肯定不是无止境的,就rk3588这个板子,开10个线程跑detect就不错了,再开多了,搞不好还速度下降了。当然这里就是线程的数量处理问题了,经常进行并发编程的是知道的,线程并不是越多越好。

那我们来看这样一个队列处理:

在这里插入图片描述

这里的1st 2nd 3rd都是表示得到结果的顺序,可以快速在输出的队列上进行对应填写结果,那么:

  1. 当得到3的结果的时候,先在输出的队列索引3上写入结果,然后发现1和2还是没有得到,不进行输出

  2. 当得到2的结果的时候,先在输出的队列索引2上写入结果,然后发现1还是没有得到,不进行输出

  3. 当得到1的结果的时候,先在输出的队列索引1上写入结果,进行输出,然后发现2和3的结果也有了,那就一起输出

在这里插入图片描述

那么我们如何构造这样一个队列呢?

首先,我们注意到uin8_t这个类型,取值是0~255,并且当其为255的时候,自增会回到0,利用自身的溢出功能,能帮我们构建天然的索引闭环!

那我们直接将输出队列使用大小256的数组进行表示,是不是就可以实现了一个自动闭环的队列索引了?然后控制输入输出的索引处理,连排序都省去了!

256的大小完全远远超出我们的使用极限了,线程就是开了20个,那最差的情况差不多就是19个数据帧等待一个数据帧的处理结束,256完全满足了缓冲的条件了。

这只是我们假想的一个极特殊的情况,等待个线程数量的数据帧已经很极限了,正常的时候等两三帧就已经差不多了

那么用代码怎么实现呢:

// 假设的输入数据用这个结构体存储
struct TaskInput
{
	cv::Mat m_img;
    uint8_t m_taskId;
    ... // 其他的额外数据
};

// 假设输出的结果用这个一个结构体进行存储
struct TaskResult
{
    TaskInput m_inputData;
    bool m_isGet;  // 是否得到了结果
    ... // 结果的数据存储
};

// 然后我们有一个线程池的类
class TaskPool
{
public:
    // 添加一个处理的任务
    void add(cv::Mat&& img);
    // 线程处理的函数,每一个线程都是调用这个函数进行识别的处理
    void run(int thread_id);
    // 这是所有线程处理完了之后都调用这个函数来进行结果的处理
    void dealResult(TaskInput&& task_data, ...); // 这里的 ... 只是表示一个结果的类型,暂时不写明具体的形参类型
private:
    bool m_isRun;
    
    std::mutex m_mutexInput;
    std::queue<TaskInput> m_taskInputs;
    uint8_t m_indexInput = 0; // 初始化为0
    
    std::mutex m_mutexOutput;
    uint8_t m_indexOutput = 0; // 初始化为0
    std::array<TaskResult, 256> m_taskResults;
}

那我们在add中就需要处理好输入!

void TaskPool::add(cv::Mat&& img)
{
    // 使用线程锁保护一下
    lock_guard<mutex> lg(m_mutexInput);
        
    // 将输入添加到任务队列中
    TaskInput input;
    input.m_img = std::move(img);
    input.m_taskId = m_indexInput++; // 这里就打上了序号了……
    
    // 其他可能的操作
    ...;
}

run函数中做好数据的提取:

void TaskPool::run(int thread_id)
{
    while (m_isRun)
    {
        TaskInput task_data;

        {
            unique_lock<mutex> lock(m_mutexInput);
            m_cvTask.wait(lock, [&]
                          { return !m_taskInputs.empty() || !m_isRun; });
            if (!m_isRun)
            {
                return;
            }

            // 减少内存拷贝
            task_data = std::move(m_taskInputs.front());
            m_taskDatas.pop();
        }

        // 这里就是根据数据进行识别的处理了,包括前处理、推理和后处理等等
        realWork(thread_id, std::move(task_data));
        // 注意,结果是通过回调处理的!
    }
}

那么获得了结果了,那么这里就需要在dealResult进行处理:

void TaskPool::dealResult(TaskInput&& task_data, ...)
{
    // 这里只是表示通过task_data来构建result
    TaskResult result(std::move(task_data), ...);
    result.m_isGet = true;
    
    lock_guard<mutex> lg(m_mutexOutput);
    m_taskResults[result.m_inputData.m_taskId] = std::move(result);
    while (true)
    {
        if (m_taskResults[m_outputIndex].m_isGet)
        {
            auto &res_data = m_taskResults[m_outputIndex];
            // 这里就可以去处理结果了
            callback(std::move(res_data));
            
            // 处理完成了,这个又要变成没有得到结果的状态了!
            res_data.m_isGet = false;
            ++m_outputIndex;
        }
        else
        {
            break; // 如果最前方的数据还没有得到结果,那就直接退出……
        }
    }
}

这样的处理是不是很优雅了……

2.3 进一步优化

上述的处理其实已经能提高不少了,只需要将rknn_model_zoo中的代码变换成多线程处理就可以了。

当然优化嘛,是需要精益求精的,将rknn_model_zoo中的那些低水平的代码更换为我们高水平的代码,优化他们的前处理,后处理,使用opencveigen等库来进行优化函数处理,并且使用我们更好的程序设计,避免一些代码的冗余等等……

3. 代码

这些优化和设计有一个可用的代码库,先整理一下,后续的博客再进行分享了。


网站公告

今日签到

点亮在社区的每一天
去签到