[实现Rpc] 通信-Muduo库的实现 | && 完美转发 | reserve | unique_lock

发布于:2025-02-21 ⋅ 阅读:(18) ⋅ 点赞:(0)

目录

MudouBuffer

⭕右值引用 | 完美转发

右值引用

完美转发

实现原理

结合右值引用和完美转发的例子

LVProtocol

⭕vector 的 reserve 函数

1. 背景

2. reserve 函数原型

3. 示例代码

4. 输出结果

5. 结果解析

6. 关键点说明

MuduoConnection

⭕mudou 库

🎢MudouServer

笔记:unique_lock

概述

构造与析构

生命周期中的操作

接口总结

MudouClient


通信-Muduo库的封装实现

(1)需要实现的功能:

  • MuduoBuffer:管理网络通信的缓冲区。
  • MuduoProtocol:给通信信息规定特定字段,防止粘包。
  • MuduoConnection:管理网络连接和关闭。
  • MuduoServer:管理服务端信息。
  • MuduoClient:管理客户端信息。

(2)具体实现:

MudouBuffer

连接 mudou 库,实现功能

class MuduoBuffer : public BaseBuffer
{
public:
    using ptr = std::shared_ptr<MuduoBuffer>;

    MuduoBuffer(muduo::net::Buffer *buf)
        :_buf(buf)
    {}

//连接 mudou库
//实现 功能

    virtual size_t readableSize() override { return _buf->readableBytes(); }

    //muduo库是一个网络库,从缓冲区取出一个4字节整形,会进行网络字节序的转换
    virtual int32_t peekInt32() override { return _buf->peekInt32(); }

    virtual void retrieveInt32() override { _buf->retrieveInt32(); }

    virtual int32_t readInt32() override { return _buf->readInt32(); }

    virtual std::string retrieveAsString(size_t len) override { return _buf->retrieveAsString(len); }

private:
    muduo::net::Buffer *_buf;

};

/工厂
class BufferFactory
{
public:
    template <typename... Args>
    
    //!!!!!!!!右值引用 完美转发
    static BaseBuffer::ptr create(Args &&...args)
    {
        return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);
    }
};

note:

⭕右值引用 | 完美转发

前文回顾:[C++11#45](二) 右值引用 | 移动语义 | 万能引用 | 完美转发forward | 初识lambda

右值引用和完美转发是C++11引入的两个重要特性

右值引用

右值与左值

  • 左值:可以取地址并赋值的数据表达式。例如变量i、指针p等都是左值。
  • 右值:不能取地址的数据表达式,如字面量(如5)、临时对象或返回非引用类型的函数调用结果等。

右值引用语法

  • 左值引用使用单个&,如int& ri = i;
  • 右值引用使用双&&,如int&& rri = 5;

右值引用允许我们高效地“窃取”资源(如内存),这对于避免不必要的拷贝操作特别有用。比如在移动构造函数中,我们可以直接转移资源的所有权而不是复制资源。

完美转发

完美转发是指在模板编程中,能够保持传入参数的左值或右值属性,并正确地将其传递给另一个函数的能力。这使得我们可以根据传入参数的不同类型(左值或右值)来决定是否应用移动语义或其他特定于类型的操作。

实现原理

完美转发通常通过以下方式实现:

template<typename T>
void forwardFunc(T&& arg) {
    someOtherFunc(std::forward<T>(arg));
}

这里的关键在于std::forward,它保留了参数的原始类型信息(左值或右值)。当forwardFunc被一个左值调用时,T会被推导为Type&,而当它被一个右值调用时,T则会被推导为Typestd::forward会根据这个信息决定如何转发参数。

结合右值引用和完美转发的例子

考虑一个场景,我们需要编写一个函数模板,它能够接受任意类型的参数并将其转发给另一个函数,同时保持该参数的左值或右值属性不变。这就是完美转发的应用场景之一:

template<typename T, typename U>
void wrapper(T&& val, U&& func) {
    func(std::forward<T>(val));
}

在这个例子中,无论val是一个左值还是右值,wrapper函数都能够正确地将其转发给func,并且func也能够按照val的实际类型(左值或右值)进行相应的处理。

总结来说,右值引用使得我们能够更有效地管理和转移资源,而完美转发则确保了在泛型编程中参数的原始类型信息不会丢失,从而允许基于参数类型的精确控制。


LVProtocol

  • 缓冲区中取出 反序列化 消息
  • 序列化
  • 缓冲区中 能否读取消息

///
class LVProtocol : public BaseProtocol
{
public:
    using ptr = std::shared_ptr<LVProtocol>;

    // |--Len--|--VALUE--|
    // |--Len--|--mtype--|--idlen--|--id--|--body--|
    //判断缓冲区中的数据量是否足够一条消息的处理

//缓冲区中 读取消息
    virtual bool canProcessed(const BaseBuffer::ptr &buf) override
    {
        if(buf->readableSize()<lenFieldsLength){
            //不够 一条消息的处理
            return false;
        }

        int32_t total_len=buf->peekInt32();
        if(buf->readableSize()<(total_len+lenFieldsLength))
        {
            return false;
        }
        //确保 有一条 完整的lv 协议
        return true;
    }

    //

//缓冲区中取出 反序列化 消息
    virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg)
    {
        //!! 当 使用onMessage的时候,默认认为 缓冲区的数据 为一条足够的完整消息
        int32_t total_len = buf->readInt32();  // 读取总⻓度
        MType mtype = (MType)buf->readInt32(); // 读取数据类型
        int32_t id_len = buf->readInt32();      // 读取id⻓度
        int32_t body_len = total_len - id_len - idlenFieldsLength - mtypeFieldsLength;

        std::string id=buf->retrieveAsString(id_len);
        std::string body=buf->retrieveAsString(body_len);
        
/*
enum class MType {
      
        //request
        //response
        
        REQ_RPC = 0,
        RSP_RPC,
        REQ_TOPIC,
        RSP_TOPIC,
        REQ_SERVICE,
        RSP_SERVICE
    }
*/
        msg=MessageFactory::create(mtype);
        if(msg.get()==nullptr)
        {
            ELOG("消息类型错误,构造 消息对象失败");
            return false;
        }

        bool ret=msg->unserialize(body);
        //!!!!!!!!!!! see abstract 中设计

        if(ret == false)
        {
            ELOG("消息正文反序列化失败!");
            return false;
        }

        msg->SetId(id);
        msg->SetMType(mtype);
        return true;
    }


//!!!!!!!!序列化
    virtual std::string serialize(const BaseMessage::ptr &msg) override
    {
        // |--Len--|--mtype--|--idlen--|--id--|--body--|
        std::string body = msg->serialize();
        std::string id = msg->GetId();
        auto mtype = htonl((int32_t)msg->GetMType());
        int32_t id_len = htonl(id.size());
        int32_t h_total_len = mtypeFieldsLength + idlenFieldsLength + id.size() + body.size();
        int32_t n_total_len = htonl(h_total_len);

        std::string result;
        result.reserve(h_total_len);
        result.append((char*)&n_total_len, lenFieldsLength);
        result.append((char*)&mtype, mtypeFieldsLength);
        result.append((char*)&id_len, idlenFieldsLength);
        result.append(id);
        result.append(body);

        return result;
    }

//!!!!!!!!避免魔幻字符的出现 
//不要直接使用常数
private:
    const size_t lenFieldsLength = 4;
    const size_t mtypeFieldsLength = 4;
    const size_t idlenFieldsLength = 4;

};

note:

前文回顾:[实现Rpc] 项目设计 | 服务端模块划分 | rpc | topic | server

0.

lv 协议 典型拼接思想的领悟

reserve:

《STL 源码剖析》中 我们有模拟实现过:

🎢flag: 这本书的笔记 在语雀中简单整理过一遍,之后有时间二刷后,详细再整理一遍再发叭,敬请期待......)

vectorreserve 函数
1. 背景

由于 vector 的内存管理可能涉及动态分配和释放内存,这个过程可能会很耗时。为了优化性能,我们可以使用 vectorreserve 函数来预分配内存空间,以避免频繁的内存分配和释放操作。

2. reserve 函数原型
  • 函数原型如下:
void reserve(size_type n);
  • 参数 n 指定了预分配的内存空间大小,以元素个数为单位。这意味着 reserve 函数将为 vector 预分配至少 n 个元素所需的内存空间。
3. 示例代码
#include <iostream>
#include <vector>

int main() {
    std::vector<int> vec;

    // 输出初始状态下的 size 和 capacity
    std::cout << "Before reserve: size = " << vec.size() << ", capacity = " << vec.capacity() << std::endl;

    // 预分配至少100个元素的内存空间
    vec.reserve(100);

    // 输出调用 reserve 后的 size 和 capacity
    std::cout << "After reserve: size = " << vec.size() << ", capacity = " << vec.capacity() << std::endl;

    // 向 vector 中添加50个元素
    for (int i = 0; i < 50; i++) {
        vec.push_back(i);
    }

    // 输出添加元素后的 size 和 capacity
    std::cout << "After push_back: size = " << vec.size() << ", capacity = " << vec.capacity() << std::endl;

    return 0;
}
4. 输出结果
Before reserve: size = 0, capacity = 0
After reserve: size = 0, capacity = 100
After push_back: size = 50, capacity = 100
5. 结果解析
  • 调用 reservesizecapacity 都是 0。
  • 调用 reserve:虽然 size 仍然是 0,但 capacity 变为 100,意味着已为 vector 预分配了存储 100 个元素的空间。
  • 添加元素后size 变为 50(表示已添加了 50 个元素),而 capacity 保持为 100。
6. 关键点说明
  • vector 中添加元素时size 发生变化,而 capacity 则保持不变。可以看到,在向 vector 中添加了 50 个元素之后,size 变为 50,而 capacity 仍然是 100。
  • 通过调用 reserve 函数预分配内存空间:可以有效避免在插入新元素时频繁地进行内存重新分配,从而提高程序性能。
  • 注意reserve 函数只会增加 capacity,不会改变 size 的值。

1.


MuduoConnection

class MuduoConnection : public BaseConnection
{
public:
    using ptr = std::shared_ptr<MuduoConnection>;

    MuduoConnection(BaseProtocol::ptr& protocol, const muduo::net::TcpConnectionPtr &conn)
        :_protocol(protocol)
        ,_conn(conn)
    {}

    virtual void send(const BaseMessage::ptr &msg) override
    {
        std::string body = _protocol->serialize(msg);
        _conn->send(body);
    }

    virtual void shutdown() override
    {
        _conn->shutdown();
    }

    virtual bool connected() override
    {
        return _conn->connected();
    }

private:
    BaseProtocol::ptr _protocol;
    muduo::net::TcpConnectionPtr _conn;

};

class ConnectionFactory
{
public:
    template <typename... Args>
    static BaseConnection::ptr create(Args &&...args)
    {
        return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);
    }
};

note:

⭕mudou 库

前文:

可以根据前文 demo 中的简单示例感受:[实现Rpc] 环境搭建 | JsonCpp | Mudou库 | callBack()

🎢flag: 关于 mudou 库的更详细的知识,后面有机会 会整理发出,如果还有机会 也会尝试手写 mudou 库~

Muduo是由陈硕老师开发的 基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。它采用 主从Reactor模型one loop per thread 线程模型。

从而避免了,例如一万个请求就创建一万个线程的等待浪费,主从的结构也将 对于网络连接的接收和对于信息的处理 进行了拆分。


🎢MudouServer

class MudouServer:public BaseServer{
    public:
        using ptr=std::shared_ptr<MudouServer>;
        MudouServer(int port):
        _server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),
            "MudouServer",muduo::net::TcpServer::kReusePort),
             _protocol(ProtocolFactory::create()){}
        
        //
        virtual void start() {
                _server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));
                _server.setMessageCallback(std::bind(&MuduoServer::onMessage, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _server.start();//先开始监听
                _baseloop.loop();//开始死循环事件监控
            }
    private:
        void onConnection(const muduo::net::TcpConnectionPtr &conn){

            if(conn->connected()){
                std::cout<<"连接 建立\n";
                auto muduo_conn=ConnectionFactory::create(conn,_protocol);
                {
                    //加锁 给工厂 进行连接
                    //!!!unique_lock
                    std::unique_lock<std::mutex> lock(_mutex);
                    _conns.insert(std::make_pair(conn,muduo_conn));//建立 连接
                }
                if(_cb_connection) _cb_connection(muduo_conn);
            }
            else{
                std::cout<<"连接 断开\n";
                BaseConnection::ptr muduo_conn;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it=_conns.find(conn);
                    if(it==_conns.end()){
                        return;
                    }
                    muduo_conn=it->second;
                    _conns.erase(conn);
                }
                if(_cb_close)
                    _cb_close(muduo_conn);
            }
        }

        void onMessage(const muduo::net::TcpConnectionPtr &conn,muduo::net::Buffer *buf,muduo::Timestamp)
        {
            DLOG("连接有 数据到来,开始处理!");
            auto base_buf=BufferFactory::create(buf);
            while(1)
            {
                if(_protocol->canProcessed(base_buf)==false)
                {
                    //数据不足
                    if(base_buf->readableSize()>maxDataSize)
                    {
                        conn->shutdown();
                        ELOG("缓冲区 数据过大!");
                        return;
                    }
                    break;
                }

                BaseMessage::ptr msg;
                bool ret=_protocol->onMessage(base_buf,msg);
                if(ret==false)
                {
                    conn->shutdown();
                    ELOG("缓冲区 数据错误");
                    return;
                }

                BaseConnection::ptr base_conn;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto iter=_conns.find(conn);
                    if(iter==_conns.end())
                    {
                        conn->shutdown();
                        return;
                    }

                    base_conn=iter->second;
                }
                if(_cb_message)
                {
                    _cb_message(base_conn,msg);
                }
            }
        }

    private:
        const size_t maxDataSize = (1 << 16);
        BaseProtocol::ptr _protocol;//协议

        muduo::net::EventLoop _baseloop;//
        muduo::net::TcpServer _server;//调用mudou库接口

        std::mutex _mutex;//加锁
        std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;
        //建立映射
};

issue:

1.

2.

对于 线程库 _mutex unique_lock 回顾:之前有简单的写到过 [Linux#42][线程] 锁的接口 | 原理 | 封装与运用 | 线程安全

[Linux#46][线程->网络] 单例模式 | 饿汉与懒汉 | 自旋锁 |读写锁 | 网络基础 | 书单

flag:补充 深入学习 C++线程库的笔记

(写 这个项目 我真的立下了好多个 flag... ...🤣)

笔记:unique_lock

概述

  • lock_guard 的可操作性很低,只有构造和析构两个函数,也就是只有自动释放锁的能力。而 unique_lock 功能更加丰富,而且可以自由操作锁。
构造与析构
  • unique_lock 在构造时,可以传入一把锁,在构造的同时会对该锁进行加锁。
  • unique_lock 析构时,判断当前的锁有没有加锁,如果加锁了就先释放锁,后销毁对象。
生命周期中的操作

  • 而在构造与析构之间,也就是整个 unique_lock 的生命周期,可以自由地加锁解锁:
    • lock:加锁
    • unlock:解锁
    • try_lock:如果没上锁就加锁,上锁了就返回
    • try_lock_until:如果到指定时间还没申请到锁就返回 false,申请到锁返回 true
    • try_lock_for:如果一段时间内没申请到锁就返回 false,申请到锁返回 true
接口总结
  • 提供了以上五个接口,也就是说可以作用于前面的任何一款锁。
  • 另外的 unique_lock 还允许赋值 operator=,调用赋值时,如果当前锁没有持有锁,那么直接拷贝。如果当前锁持有锁,那么把锁的所有权转移给新的 unique_lock,自己不再持有锁。

示例代码

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void lock_example() {
    std::unique_lock<std::mutex> ul(mtx);
    // 锁已加
    std::cout << "Locked" << std::endl;
}

void try_lock_example() {
    std::unique_lock<std::mutex> ul(mtx, std::defer_lock); // 不立即加锁
    if (mtx.try_lock()) { // 尝试加锁
        std::cout << "Locked" << std::endl;
        mtx.unlock(); // 解锁
    } else {
        std::cout << "Not Locked" << std::endl;
    }
}

int main() {
    std::thread t1(lock_example);
    std::thread t2(try_lock_example);

    t1.join();
    t2.join();

    return 0;
}

输出结果

总结

  • unique_lock 提供了更丰富的锁操作接口,包括 加锁、解锁、尝试加锁 等。
  • 支持赋值操作,可以在不持有锁的情况下直接拷贝,持有锁的情况下转移所有权。
  • 通过这些接口,可以更灵活地控制锁的行为,提高程序的并发性能。

MudouClient

基于 muduoServer 进行微调,很快就能写出来~

class MuduoClient : public BaseClient
{
public:
    using ptr = std::shared_ptr<MuduoClient>;

    MuduoClient(const std::string &ip, int port)
        :_protocol(ProtocolFactory::create())
        ,_baseloop(_loopthread.startLoop())
        ,_downlatch(1)
        ,_client(_baseloop, muduo::net::InetAddress(ip, port), "DictClient")
    {}

    virtual void connect() override
    {
        DLOG("设置回调函数,连接服务器");
        _client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));
        _client.setMessageCallback(std::bind(&MuduoClient::onMessage, this, std::placeholders::_1, 
                                   std::placeholders::_2, std::placeholders::_3));
        
        //连接服务器
        _client.connect();
        _downlatch.wait();

        DLOG("连接服务器成功!");
    }

    virtual void shutdown() override { return _client.disconnect(); }

    virtual bool send(const BaseMessage::ptr &msg) override
    {
        if(_conn->connected() == false)
        {
            ELOG("连接已断开!");
            return false;
        }
        else
        {
            _conn->send(msg);
        }

        return true;
    }

    virtual BaseConnection::ptr connection() override { return _conn; }

    virtual bool connected() override { return (_conn && _conn->connected()); }

private:
    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        if(conn->connected())
        {
            std::cout << "连接建立!\n";
            _downlatch.countDown();//计数--,为0时唤醒阻塞
            _conn = ConnectionFactory::create(_protocol, conn);
        }
        else
        {
            std::cout << "连接断开!\n";
            _conn.reset();
        }
    }

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp)
    {
        DLOG("连接有数据到来,开始处理!");
        auto base_buf = BufferFactory::create(buf);
        //?
        while(1)
        {
            if(_protocol->canProcessed(base_buf) == false)
            {
                //数据不足
                if(base_buf->readableSize() > maxDataSize)
                {
                    conn->shutdown();
                    ELOG("缓冲区中数据过大!");
                    return;
                }

                DLOG("数据量不足!");
                break;
            }

            BaseMessage::ptr msg;
            bool ret = _protocol->onMessage(base_buf, msg);
            if(ret == false)
            {
                conn->shutdown();
                ELOG("缓冲区中数据错误!");
                return;
            }

            if(_cb_message)
            {
                _cb_message(_conn, msg);
            }
        }
    }

private:
    const size_t maxDataSize = (1 << 16);
    BaseProtocol::ptr _protocol;
    BaseConnection::ptr _conn;
    muduo::CountDownLatch _downlatch;
    muduo::net::EventLoopThread _loopthread;
    muduo::net::EventLoop *_baseloop;
    muduo::net::TcpClient _client;

};

class ClientFactory
{
public:
    template <typename... Args>
    static BaseClient::ptr create(Args &&...args)
    {
        return std::make_shared<MuduoClient>(std::forward<Args>(args)...);
    }
};


网站公告

今日签到

点亮在社区的每一天
去签到