应用层自定义协议与序列化

发布于:2025-09-07 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

应用层

再识协议

序列化和反序列化

重新理解read、write、recv、send和tcp为什 么支持全双工

网络版计算器

服务层代码

定制协议

Request类(请求数据)

Response类(响应数据)

Protocol类(协议)

关于流式数据的处理

NetCal计算机类

Client.cc(客户端)

main.cc

效果演示

附录

Jsoncpp

序列化

反序列化

总结


应用层

我们程序员写的一个个解决我们实际问题,满足我们日常需求的网络程序,都是在应用层

再识协议

协议是一种"约定".socketapi的接口,在读写数据时,都是按"字符串"的方式来发送接 收的.如果我们要传输一些"结构化的数据"怎么办呢?

其实,协议就是双方约定好的结构化的数据

结构化数据的传输

通信双方在进行网络通信时:

  • 如果需要传输的数据是一个字符串,那么直接将这一个字符串发送到网络当中,此时对端也能从网络当中获取到这个字符串。
  • 但如果需要传输的是一些结构化的数据,此时就不能将这些数据一个个发送到网络当中。

这是显而易见的,在传递结构化数据时如果直接发送会面临许多问题诸如字节序问题、内存对齐问题和数据类型大小不一致等的问题。

那么该怎么传递结构化的数据呢?

制定一个协议(序列化和反序列化)来实现

下面以设计一个基于网络tcp的计算器为例

我们需要实现一个服务器版的加法器.我们需要客户端把要计算的两个数发过去, 然后由服务器进行计算,最后再把结果返回给客户端.

约定方案一

  • 客户端发送一个形如"1+1"的字符串;
  • 这个字符串中有两个操作数,都是整形;
  • 两个数字之间会有一个字符是运算符,运算符是+或-等等;
  • 数字和运算符之间没有空格

定制结构体+序列化和反序列化

约定方案二

  • 定义结构体来表示我们需要交互的信息; 
  • 发送数据时将这个结构体按照一个规则转换成字符串,接收到数据的时候再按照相同的规则把字符串转化回结构体;
  • 这个过程叫做"序列化"和"反序列化"

客户端可以定制一个结构体,将需要交互的信息定义到这个结构体当中。客户端发送数据时先对数据进行序列化,服务端接收到数据后再对其进行反序列化,此时服务端就能得到客户端发送过来的结构体,进而从该结构体当中提取出对应的信息。

序列化和反序列化

序列化和反序列化:

  • 序列化是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
  • 反序列化是把字节序列恢复为对象的过程。

OSI七层模型中表示层的作用就是,实现设备固有数据格式和网络标准数据格式的转换。其中设备固有的数据格式指的是数据在应用层上的格式,而网络标准数据格式则指的是序列化之后可以进行网络传输的数据格式。

无论我们采用方案一,还是方案二,还是其他的方案,只要保证,一端发送时构造的数据, 在另一端能够正确的进行解析,就是ok的.这种约定,就是应用层协议

重新理解read、write、recv、send和tcp为什 么支持全双工

在任何一台主机上,TCP连接既有发送缓冲区,又有接受缓冲区,所以在内核 中,可以在发消息的同时,也可以收消息,即全双工

网络版计算器

下面实现一个网络版的计算器,主要目的是感受一下什么是协议

服务层代码

把上篇Tcp Socket博客实现的TcpSocket封装成一个类

namespace SocketModule
{
    using namespace LogModule;
    const static int gbacklog = 16;
    // 模版方法模式
    // 基类socket, 大部分方法,都是纯虚方法
    class Socket
    {
    public:
        virtual ~Socket() {}
        virtual void SocketOrDie() = 0;
        virtual void BindOrDie(uint16_t port) = 0;
        virtual void ListenOrDie(int backlog) = 0;
        virtual std::shared_ptr<Socket> Accept(InetAddr *client) = 0;
        virtual void Close() = 0;
        virtual int Recv(std::string *out) = 0;
        virtual int Send(const std::string &message) = 0;
        virtual int Connect(const std::string &server_ip, uint16_t port) = 0;

    public:
        void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
        {
            SocketOrDie();
            BindOrDie(port);
            ListenOrDie(backlog);
        }
        void BuildTcpClientSocketMethod()
        {
            SocketOrDie();
        }
    };
    const static int defaultfd = -1;

    class TcpSocket : public Socket
    {
    public:
        TcpSocket() : _sockfd(defaultfd)
        {
        }
        TcpSocket(int fd) : _sockfd(fd)
        {
        }
        ~TcpSocket()
        {
        }
        void SocketOrDie() override
        {
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(LogLevel::FATAL) << "socket error";
                exit(SOCKET_ERR);
            }
            LOG(LogLevel::INFO) << "socket success";
        }

        void BindOrDie(uint16_t port) override
        {
            InetAddr local(port);
            int n = ::bind(_sockfd, local.NetAddrPtr(), local.NetAddrLen());
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "bind error";
                exit(BIND_ERR);
            }
            LOG(LogLevel::INFO) << "bind success";
        }

        void ListenOrDie(int backlog) override
        {
            int n = ::listen(_sockfd, backlog);
            if (n < 0)
            {
                LOG(LogLevel::ERROR) << "listen error";
                exit(LISTEN_ERR);
            }
            LOG(LogLevel::INFO) << "listen success";
        }

        std::shared_ptr<Socket> Accept(InetAddr *client) override
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int fd = ::accept(_sockfd, CONV(peer), &len);
            if (fd < 0)
            {
                LOG(LogLevel::WARNING) << "accept waring";
                return nullptr;
            }
            client->SetAddr(peer);
            return std::make_shared<TcpSocket>(fd);
        }

        int Recv(std::string *out) override
        {
            // 流式读取,不关心读到的是什么
            char buffer[1024];
            ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
            if (n > 0)
            {
                buffer[n] = 0;
                *out += buffer; 
            }
            return n;
        }
        int Send(const std::string &message) override
        {
            return send(_sockfd, message.c_str(), message.size(), 0);
        }
        void Close() override 
        {
            if (_sockfd >= 0)
                ::close(_sockfd);
        }
        int Connect(const std::string &server_ip, uint16_t port) override
        {
            InetAddr server(server_ip, port);
            return ::connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
        }

    private:
        int _sockfd; // listensockfd, sockfd;
    };
}

定制协议

要实现一个网络版的计算器,就必须保证通信双方能够遵守某种协议约定,因此我们需要设计一套简单的约定。数据可以分为请求数据和响应数据,因此我们分别需要对请求数据和响应数据进行约定。我们没有自己实现一个序列化和反序列化的类,下面代码都是通过Jsoncpp库提供的方法来实现序列化与反序列化详情见附录

Request类(请求数据)

// 约定好各个字段的含义,本质就是约定好协议!
// 如何要做序列化和反序列化:
// 1. 我们自己写(怎么做) ---> 往往不具备很好的扩展性
// 2. 使用现成的方案(这个是我们要写的) ---> json -> jsoncpp
class Request
{
public:
    Request()
    {
    }
    Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
    {
    }

    //序列化
    std::string Serialize()
    {
        // _x = 10 _y = 20, _oper = '+'
        // "10" "20" '+' : 用空格作为分隔符
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["oper"] = _oper;

        Json::FastWriter writer;
        std::string s = writer.write(root);
        return s;
    }

     //反序列化
    // {"x": 10, "y" : 20, "oper" : '+'}
    bool Deserialize(std::string &in)
    {
        // "10" "20" '+' -> 以空格作为分隔符 -> 10 20 '+'
        Json::Value root;
        Json::Reader reader;
        bool ok = reader.parse(in, root);
        if (ok)
        {
            _x = root["x"].asInt();
            _y = root["y"].asInt();
            _oper = root["oper"].asInt();
        }
        return ok;
    }
    ~Request() {}
    int X() { return _x; }
    int Y() { return _y; }
    char Oper() { return _oper; }

private:
    int _x;
    int _y;
    char _oper; // + - * / % -> _x _oper _y -> 10 + 20
};

Response类(响应数据)

class Response
{
public:
    Response() {}
    Response(int result, int code) : _result(result), _code(code)
    {
    }

    //序列化
    std::string Serialize()
    {
        Json::Value root;
        root["result"] = _result;
        root["code"] = _code;

        Json::FastWriter writer;
        return writer.write(root);
    }
    
    //反序列化
    bool Deserialize(std::string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool ok = reader.parse(in, root);
        if (ok)
        {
            _result = root["result"].asInt();
            _code = root["code"].asInt();
        }
        return ok;
    }
    ~Response() {}
    void SetResult(int res)
    {
        _result = res;
    }
    void SetCode(int code)
    {
        _code = code;
    }

private:
    int _result; // 运算结果,无法区分清楚应答是计算结果,还是异常值
    int _code;   // 0:sucess, 1,2,3,4->不同的运算异常的情况
};

Protocol类(协议)

// 协议(基于TCP的)需要解决两个问题:
// 1. request和response必须得有序列化和反序列化功能
// 2. 你必须保证,读取的时候,读到完整的请求(TCP, UDP不用考虑)

const std::string sep = "\r\n";
using func_t = std::function<Response(Request &req)>;

class Protocol
{
public:
    Protocol()
    {
    }
    Protocol(func_t func) : _func(func)
    {
    }
    std::string Encode(const std::string &jsonstr)
    {
        // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
        std::string len = std::to_string(jsonstr.size());
        return len + sep + jsonstr + sep;
    }

    // 1. 判断报文完整性
    // 2. 如果包含至少一个完整请求,提取他, 并从移除它,方便处理下一个
    bool Decode(std::string &buffer, std::string *package)
    {
        // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
        ssize_t pos = buffer.find(sep);
        if (pos == std::string::npos)
            return false;
        std::string package_len_str = buffer.substr(0, pos); // 50
        int package_len_int = std::stoi(package_len_str);    // 报文长度{"x": 10, "y" : 20, "oper" : '+'}
        int target_len = package_len_str.size() + package_len_int + 2 * sep.size();
        if (buffer.size() < target_len)
            return false;

        // buffer一定至少有一个完整的报文!
        *package = buffer.substr(pos + sep.size(), package_len_int);
        buffer.erase(0, target_len);
        return true;
    }

    void GetRequest(std::shared_ptr<Socket> &sock, InetAddr &client)
    {
        // 读取
        std::string buffer_queue;
        while (true)
        {
            int n = sock->Recv(&buffer_queue);
            if (n > 0)
            {
                std::cout << "-----------request_buffer--------------" << std::endl;
                std::cout << buffer_queue << std::endl;
                std::cout << "------------------------------------" << std::endl;

                std::string json_package;
                // 1. 解析报文,提取完整的json请求,如果不完整,就让服务器继续读取
                while (Decode(buffer_queue, &json_package))
                {
                    LOG(LogLevel::DEBUG) << client.StringAddr() << " 请求: " << json_package;
                    Request req;
                    bool ok = req.Deserialize(json_package);
                    if (!ok)
                        continue;
                    // 3. 我一定得到了一个内部属性已经被设置了的req了.
                    // 通过req->resp, 不就是要完成计算功能嘛!!业务
                    Response resp = _func(req);

                    // 4. 序列化
                    std::string json_str = resp.Serialize();

                    // 5. 添加自定义长度
                    std::string send_str = Encode(json_str); // 携带长度的应答报文了"len\r\n{result:XXX, code:XX}\r\n"

                    // 6. 直接发送
                    sock->Send(send_str);
                }
            }
            else if (n == 0)
            {
                LOG(LogLevel::INFO) << "client:" << client.StringAddr() << "Quit!";
                break;
            }
            else
            {
                LOG(LogLevel::WARNING) << "client:" << client.StringAddr() << ", recv error";
                break;
            }
        }
    }
    bool GetResponse(std::shared_ptr<Socket> &client, std::string &resp_buff, Response *resp)
    {
        // 面向字节流,你怎么保证,你的client读到的 一个网络字符串,就一定是一个完整的请求呢??
        while (true)
        {
            int n = client->Recv(&resp_buff);
            if (n > 0)
            {
                std::string json_package;
                // 1. 解析报文,提取完整的json请求,如果不完整,就让服务器继续读取
                // bool ret = Decode(resp_buff, &json_package);
                // if (!ret)
                //     continue;
                while (Decode(resp_buff, &json_package))
                {
                    resp->Deserialize(json_package);
                }
                return true;
            }
            else if (n == 0)
            {
                std::cout << "server quit " << std::endl;
                return false;
            }
            else
            {
                std::cout << "recv error" << std::endl;
                return false;
            }
        }
    }
    std::string BuildRequestString(int x, int y, char oper)
    {
        // 1. 构建一个完整的请求
        Request req(x, y, oper);

        // 2. 序列化
        std::string json_req = req.Serialize();

        // 3. 添加长度报头
        return Encode(json_req);
    }
    ~Protocol()
    {
    }

private:
    // 因为我们用的是多进程
    // Request _req;
    // Response _resp;
    func_t _func;
};

在协议中,我们怎么保证读到的数据是我们想要的呢?

1、客户端写入服务器时是依据协议的(Encode),这样我们才能读到想要的数据

2、收到数据后我们进行解析报文(Decode)来正确读到序列化后(即一串字符串)的数据。

然后通过反序列化读到x、y、oper的值并依此初始化Request

3、我们在向应用层发送数据时也要依据协议,发送的报文要序列化+报头(Encode)

关于流式数据的处理

为什么无法保证“读完所有内容”?​

  • TCP协议会将数据拆分为多个IP包传输,接收方的 recv()可能只读到部分数据。
  • 操作系统内核的Socket缓冲区大小有限,可能分批交付数据。
  • 网络延迟或拥塞可能导致数据分片到达。

以下面代码为例

    // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
    // 5
    // 50
    // 50\r
    // 50\r\n
    // 50\r\n{"x": 10, "
    // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
    // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n50\r\n{"x": 10, "y" : 20, "ope
    //.....
    // 1. 判断报文完整性
    // 2. 如果包含至少一个完整请求,提取他, 并从移除它,方便处理下一个
    bool Decode(std::string &buffer, std::string *package)
    {
        // 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
        ssize_t pos = buffer.find(sep);
        if (pos = std::string::npos)
            return false;
        std::string package_len_str = buffer.substr(0, pos); // 50
        int package_len_int = std::stoi(package_len_str);    // 报文长度{"x": 10, "y" : 20, "oper" : '+'}
        int target_len = package_len_str.size() + package_len_int + 2 * sep.size();
        if (buffer.size() < target_len)
            return false;

        // buffer一定至少有一个完整的报文!
        *package = buffer.substr(pos + sep.size(), package_len_int);
        buffer.erase(0, target_len);
        return true;
    }

首先读取的数据要有分隔符sep(\r\n),如果没有找到则返回错误

每条报文的报头是报文的长度package_len_str,一条完整的报文包括报头、两个分隔符、报文具体内容(target_len)。如果传过来的buffer小于这个长度显然消息不完整返回错误。

NetCal计算机类

class Cal
{
public:
    Response Execute(Request &req)
    {
        Response resp(0, 0); // code: 0表示成功
        switch (req.Oper())
        {
        case '+':
            resp.SetResult(req.X() + req.Y());
            break;
        case '-':
            resp.SetResult(req.X() - req.Y());

            break;
        case '*':
            resp.SetResult(req.X() * req.Y());
            break;
        case '/':
        {
            if (req.Y() == 0)
            {
                resp.SetCode(1); // 1除零错误
            }
            else
            {
                resp.SetResult(req.X() / req.Y());
            }
        }
        break;
        case '%':
        {
            if (req.Y() == 0)
            {
                resp.SetCode(2); // 2 mod 0 错误
            }
            else
            {
                resp.SetResult(req.X() % req.Y());
            }
        }
        break;
        default:
            resp.SetCode(3); // 非法操作
            break;
        }

        return resp;
    }
};

Client.cc(客户端)

using namespace SocketModule;

void Usage(std::string proc)
{
    std::cerr << "Usage: " << proc << " server_ip server_port" << std::endl;
}

void GetDataFromStdin(int *x, int *y, char *oper)
{
    std::cout << "Please Enter x: ";
    std::cin >> *x;
    std::cout << "Please Enter y: ";
    std::cin >> *y;
    std::cout << "Please Enter oper: ";
    std::cin >> *oper;
}

// ./tcpclient server_ip server_port
int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    std::string server_ip = argv[1];
    uint16_t server_port = std::stoi(argv[2]);
    std::shared_ptr<Socket> client = std::make_shared<TcpSocket>();
    client->BuildTcpClientSocketMethod();

    if (client->Connect(server_ip, server_port) != 0)
    {
        // 失败
        std::cerr << "connect error" << std::endl;
        exit(CONNECT_ERR);
    }

    std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>();
    std::string resp_buffer;
    // 连接服务器成功
    while (true)
    {
        // 1. 从标准输入当中获取数据
        int x, y;
        char oper;
        GetDataFromStdin(&x, &y, &oper);

        // 2. 构建一个请求-> 可以直接发送的字符串
        std::string req_str = protocol->BuildRequestString(x, y, oper);
        
        // 3. 发送请求
        client->Send(req_str);

        // 4. 获取应答
        Response resp;
        bool res = protocol->GetResponse(client, resp_buffer, &resp);
        if(res == false)
            break;

        // 5. 显示结果
        resp.ShowResult();
    }
    client->Close();
    return 0;
}

main.cc

void Usage(std::string proc)
{
    std::cerr << "Usage: " << proc << " port" << std::endl;
}

// ./tcpserver 8080
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    // 1. 顶层
    std::unique_ptr<Cal> cal = std::make_unique<Cal>();

    // 2. 协议层
    std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>([&cal](Request &req)
      { return cal->Execute(req); });

    // 3. 服务器层
    std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(std::stoi(argv[1]),
      [&protocol](std::shared_ptr<Socket> &sock, InetAddr &client)
        {
           protocol->GetRequest(sock, client);
        });
    tsvr->Start();
    return 0;
}

效果演示

先让服务器跑起来

客户端进行连接

附录

Jsoncpp

Jsoncpp 是一个用于处理JSON数据的C++库。它提供了将JSON数据序列化为字 符串以及从字符串反序列化为C++数据结构的功能。Jsoncpp是开源的,广泛用于各种需要处理JSON数据的C++项目中。

特性

1.简单易用:Jsoncpp提供了直观的API,使得处理JSON数据变得简单。

2.高性能:Jsoncpp的性能经过优化,能够高效地处理大量JSON数据。

3.全面支持:支持JSON标准中的所有数据类型,包括对象、数组、字符串、数字、布尔值和null。

4. 错误处理:在解析JSON数据时,Jsoncpp提供了详细的错误信息和位置,方便开发者调试。

当使用Jsoncpp库进行JSON的序列化和反序列化时,确实存在不同的做法和工具类可供选择。以下是对Jsoncpp中序列化和反序列化操作的详细介绍:

序列化

序列化指的是将数据结构或对象转换为一种格式,以便在网络上传输或存储到文件 中。Jsoncpp提供了多种方式进行序列化:

1.使用Json::Value的toStyledString方法:

#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
intmain()
{
    Json::Valueroot;
    root["name"] = "joe";
    root["sex"] = "男";
    std::strings = root.toStyledString();
    std::cout << s << std::endl;
    return0;
}
$./ test.exe{
    "name" : "joe",
    "sex" : "男"
}

2.使用Json::StreamWriter:

#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>
intmain()
{
    Json::Valueroot;
    root["name"] = "joe";
    root["sex"] = "男";
    Json::StreamWriterBuilderwbuilder; // StreamWriter的
    工厂
    std::unique_ptr<Json::StreamWriter>
        writer(wbuilder.newStreamWriter());
    std::stringstreamss;
    writer->write(root, &ss);
    std::cout << ss.str() << std::endl;
    return0;
}
$./ test.exe{
    "name" : "joe",
    "sex" : "男"
}

3.使用Json::FastWriter:

#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>
int main()
{
    Json::Valueroot;
    root["name"] = "joe";
    root["sex"] = "男";
    Json::FastWriterwriter;
    std::strings = writer.write(root);
    std::cout << s << std::endl;
    return0;
}
$./ test.exe{"name" : "joe", "sex" : "男"}

反序列化

反序列化指的是将序列化后的数据重新转换为原来的数据结构或对象。Jsoncpp提供 了以下方法进行反序列化:

使用Json::Reader:


#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
int main()
{
    // JSON 字符串
    std::string json_string = "{\"name\":\"张三\",
 \"age\":30, \"city\":\"北京\"}";
    // 解析 JSON 字符串
    Json::Reader reader;
    Json::Value root;
    // 从字符串中读取 JSON 数据
    bool parsingSuccessful = reader.parse(json_string,
                                          root);
    if (!parsingSuccessful)
    {
        // 解析失败,输出错误信息
        std::cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << std::endl;
        return 1;
    }
    // 访问 JSON 数据
    std::string name = root["name"].asString();
    int age = root["age"].asInt();
    std::string city = root["city"].asString();
    // 输出结果
    std::cout << "Name: " << name << std::endl;
    std::cout << "Age: " << age << std::endl;
    std::cout << "City: " << city << std::endl;
    return 0;
}
$./ test.exe
        Name : 张三
        Age : 30 
        City : 北京

总结

  • toStyledString、StreamWriter 和FastWriter 提供了不同的序列化选项, 你可以根据具体需求选择使用。
  •  Json::Reader 和parseFromStream 函数是Jsoncpp中主要的反序列化工具, 它们提供了强大的错误处理机制。 
  • 在进行序列化和反序列化时,请确保处理所有可能的错误情况,并验证输入和输出的有效性。

网站公告

今日签到

点亮在社区的每一天
去签到