2025-04-04 Unity 网络基础5——TCP分包与黏包

发布于:2025-04-05 ⋅ 阅读:(11) ⋅ 点赞:(0)

1 分包与黏包

​ 分包、黏包指在网络通信中由于各种因素(网络环境、API 规则等)造成的消息与消息之间出现的两种状态。

  • 分包:一个消息分成了多个消息进行发送。
  • 黏包:一个消息和另一个消息黏在了一起。
分包黏包示意图

注意:

​ 分包和黏包可能同时发生。

2 解决方案

​ 为消息添加头部,记录消息的长度。

​ 当接收到消息时,通过消息长度来判断是否分包、黏包,从而对消息进行拆分、合并处理。

​ 每次只处理完整的消息。

2.1 数据接口

  1. 消息接口。
public interface INetMessage
{
    int MessageId { get; }

    int BytesLength { get; }

    byte[] ToBytes();

    int FromBytes(byte[] bytes, int index);
}
  1. 接口扩展类,用于封装基本类型的序列化。
public static class ByteLengthExtension
{
    public static int GetBytesLength(this INetMessage message, int value)
    {
        return sizeof(int);
    }

    public static int GetBytesLength(this INetMessage message, string value)
    {
        return sizeof(int) + Encoding.UTF8.GetByteCount(value);
    }

    public static int GetBytesLength(this INetMessage message, bool value)
    {
        return sizeof(bool);
    }

    public static int GetBytesLength(this INetMessage message, float value)
    {
        return sizeof(float);
    }
}

public static class INetMessageExtension
{
    public static int Write(this INetMessage message, byte[] bytes, int index, int value)
    {
        BitConverter.GetBytes(value).CopyTo(bytes, index);
        return index + sizeof(int);
    }

    public static int Read(this INetMessage message, byte[] bytes, int index, ref int value)
    {
        value = BitConverter.ToInt32(bytes, index);
        return index + sizeof(int);
    }

    public static int Write(this INetMessage message, byte[] bytes, int index, string value)
    {
        var strBytes = Encoding.UTF8.GetBytes(value);

        BitConverter.GetBytes(strBytes.Length).CopyTo(bytes, index);
        index += sizeof(int);

        strBytes.CopyTo(bytes, index);
        return index + strBytes.Length;
    }

    public static int Read(this INetMessage message, byte[] bytes, int index, ref string value)
    {
        int length = BitConverter.ToInt32(bytes, index);
        index += sizeof(int);

        value = Encoding.UTF8.GetString(bytes, index, length);
        return index + length;
    }

    public static int Write(this INetMessage message, byte[] bytes, int index, bool value)
    {
        BitConverter.GetBytes(value).CopyTo(bytes, index);
        return index + sizeof(bool);
    }

    public static int Read(this INetMessage message, byte[] bytes, int index, ref bool value)
    {
        value = BitConverter.ToBoolean(bytes, index);
        return index + sizeof(bool);
    }

    public static int Write(this INetMessage message, byte[] bytes, int index, float value)
    {
        BitConverter.GetBytes(value).CopyTo(bytes, index);
        return index + sizeof(float);
    }

    public static int Read(this INetMessage message, byte[] bytes, int index, ref float value)
    {
        value = BitConverter.ToSingle(bytes, index);
        return index + sizeof(float);
    }

    public static int Write(this INetMessage message, byte[] bytes, int index, INetMessage value)
    {
        value.ToBytes().CopyTo(bytes, index);
        return index + value.BytesLength;
    }

    public static int Read(this INetMessage message, byte[] bytes, int index, ref INetMessage value)
    {
        value.FromBytes(bytes, index);
        return index + value.BytesLength;
    }
}

2.2 定义消息

​ 在 ToBytes() 方法中,先写入消息 Id,然后写入该消息的长度,最后写入消息内容。

public class PlayerMessage : INetMessage
{
    public int    PlayerId;
    public string Name;
    public int    Atk;
    public int    Lev;

    public int MessageId { get => 1001; }

    public int BytesLength
    {
        get => this.GetBytesLength(MessageId) + 
               sizeof(int) + // 消息长度
               this.GetBytesLength(PlayerId) +
               this.GetBytesLength(Name) +
               this.GetBytesLength(Atk) +
               this.GetBytesLength(Lev);
    }

    public byte[] ToBytes()
    {
        var length = BytesLength;
        var bytes  = new byte[length];
        var index  = 0;
        index = this.Write(bytes, index, MessageId);

        // 写入消息长度
        index = this.Write(bytes, index, length - sizeof(int) * 2); // 减去消息长度和消息 Id 的长度

        index = this.Write(bytes, index, PlayerId);
        index = this.Write(bytes, index, Name);
        index = this.Write(bytes, index, Atk);
        index = this.Write(bytes, index, Lev);
        return bytes;
    }

    public int FromBytes(byte[] bytes, int index)
    {
        // 反序列化不需要解析 Id,在此之前应解析 Id 从而使用该方法
        index = this.Read(bytes, index, ref PlayerId);
        index = this.Read(bytes, index, ref Name);
        index = this.Read(bytes, index, ref Atk);
        index = this.Read(bytes, index, ref Lev);
        return index;
    }

    public override string ToString()
    {
        return $"PlayerMessage: {PlayerId}, {Name}, {Atk}, {Lev}";
    }
}

2.3 NetManager

​ 使用单例管理器 NetManager 对消息的发送与接受进行管理。

  • _socket:客户端的 Socket。
  • _sendMessages:发送消息的公共队列,主线程塞消息,发送线程拿消息进行发送。
  • _receiveMessages:接收消息的公共队列,主线程拿消息,接收线程获取消息塞进去。
  • _isConnected:是否与服务器连接。
  • _cacheBytes:消息缓冲区。
  • _cacheBytesLength:当前缓冲长度。
public class NetManager : MonoBehaviour
{
    public static NetManager Instance { get; private set; } // 单例模式
    
    private Socket _socket;
    
    private Queue<INetMessage> _sendMessages = new Queue<INetMessage>();

    private Queue<INetMessage> _receiveMessages = new Queue<INetMessage>();

    private bool _isConnected = false;

    private byte[] _cacheBytes = new byte[1024 * 1024]; // 大小为 1MB
    
    private int _cacheBytesLength;
    
    private void Awake() 
    {
        Instance = this;
    }

    private void OnDestroy()
    {
        Close();
        Instance = null;
    }
    
    ...
}
  1. 连接与断开
    • Connect():连接指定 ip 与 port 的服务器。
    • Close():断开连接。
public class NetManager : MonoBehaviour
{
    ...
        
    public void Connect(string ip, int port)
    {
        _socket ??= new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        try
        {
            _socket.Connect(ip, port);
            _isConnected = true;

            // 开启两个线程,一个负责发送消息,一个负责接收消息
            ThreadPool.QueueUserWorkItem(SendMessageThread);
            ThreadPool.QueueUserWorkItem(ReceiveMessageThread);
        }
        catch (SocketException e)
        {
            if (e.ErrorCode == 10061)
            {
                Debug.LogError("服务器拒绝连接");
            }
            else
            {
                Debug.LogError("连接失败");
            }
        }
    }

    public void Close()
    {
        _isConnected = false;
        _socket?.Shutdown(SocketShutdown.Both);
        _socket?.Close();
    }
    
    /// <summary>
    /// 发送线程的任务
    /// </summary>
    private void SendMessageThread(object state)
    {
        while (_isConnected)
        {
            if (_sendMessages.Count > 0) // 有消息就发送
            {
                var message = _sendMessages.Dequeue();
                _socket.Send(message.ToBytes());
            }
        }
    }

    private void ReceiveMessageThread(object state)
    {
        while (_isConnected)
        {
            if (_socket.Available > 0)
            {
                var receiveBytes = new byte[1024];
                var length       = _socket.Receive(receiveBytes);
                HandleReceiveMessage(receiveBytes, length); // 核心方法,依据消息长度进行处理
            }
        }
    }
    
    ...
}
  1. 发送消息
    • Send():发送指定消息。
    • SendTest():发送字节数组(测试分包、黏包使用)。
public class NetManager : MonoBehaviour
{
    ...
        
    public void Send(INetMessage message)
    {
        _sendMessages.Enqueue(message); // 塞入消息队列
    }

    public void SendTest(byte[] bytes)
    {
        _socket.Send(bytes);
    }
    
    ...
}
  1. 接收消息

    Update() 方法中不断判断接收消息队列,有消息则打印出来。

public class NetManager : MonoBehaviour
{
    ...
        
    private void Update()
    {
        if (_receiveMessages.Count > 0)
        {
            var message = _receiveMessages.Dequeue();
            Debug.Log(message); // 主线程处理消息
        }
    }
    
    ...
}

2.4 分包、黏包处理

  1. 收到消息时看之前有没有缓存,如果有,直接拼接到后面。
  2. 循环处理消息体
    • 处理前置信息(Id、解析长度)。
    • if 缓冲区 + 该条消息体 >= 一条完整信息时
      • 判断消息 Id。
      • 塞入消息队列。
      • 更新 index_cacheBytesLength
    • else 还没有接收到一条完整消息
      • 解析了前置信息,但是没有成功解析消息体,则回退到解析 Id 的位置。
      • 缓存剩余数据。
public class NetManager : MonoBehaviour
{
    ...
        
    private void HandleReceiveMessage(byte[] receiveBytes, int receiveNum)
    {
        var messageId = 0;
        var index     = 0; // receiveBytes 的处理进度(下标)

        // 收到消息时看之前有没有缓存
        // 如果有,直接拼接到后面
        receiveBytes.CopyTo(_cacheBytes, _cacheBytesLength);
        _cacheBytesLength += receiveNum;

        while (true)
        {
            var messageLength = -1; // 当前消息长度

            // 处理前置信息
            if (_cacheBytesLength - index >= 8)
            {
                // 解析 Id
                messageId =  BitConverter.ToInt32(_cacheBytes, index);
                index     += sizeof(int);

                // 解析长度
                messageLength =  BitConverter.ToInt32(_cacheBytes, index);
                index         += sizeof(int);
            }

            // 处理消息体
            if (messageLength != -1 && _cacheBytesLength - index >= messageLength)
            {
                // 解析消息体
                INetMessage message = default;
                switch (messageId)
                {
                    case 1001:
                        message = new PlayerMessage();
                        message.FromBytes(_cacheBytes, index);
                        break;
                }

                if (message != default)
                {
                    _receiveMessages.Enqueue(message); // 塞入消息队列
                }
                index += messageLength;

                // 如果消息体长度等于缓存长度,证明缓存已经处理完毕
                if (index == _cacheBytesLength)
                {
                    _cacheBytesLength = 0;
                    break;
                }
            }
            else // 消息体还没有接收完毕
            {
                // 解析了前置信息,但是没有成功解析消息体
                if (messageLength != -1)
                {
                    index -= 8; // 回退到解析 Id 的位置
                }

                // 缓存剩余的数据
                _cacheBytesLength -= index;
                Array.Copy(_cacheBytes, index, _cacheBytes, 0, _cacheBytesLength);

                break;
            }
        }
    }
}

3 测试

​ 服务端、客户端配置见 2025-03-25 Unity 网络基础4——TCP同步通信_unity tcp-CSDN博客

3.1 服务端

​ 封装 ServerSocket 和 ClientSocket 方便通信。

  1. Program.cs
    • 创建 ServerSocket,使用本地 ip 127.0.0.1 作为服务器地址。
    • 循环监听指令。
      • “exit”:退出。
      • “B:1001”:向所有客户端发送消息。
// See https://aka.ms/new-console-template for more information

using NetLearningTcpServerExercise2;

var serverSocket = new ServerSocket();
serverSocket.Start("127.0.0.1", 8080, 1024);
Console.WriteLine("Server Start!");

while (true)
{
    var input = Console.ReadLine();
    if (input == "exit")
    {
        serverSocket.Close();
        break;
    }
    else if (input?[..2] == "B:") // 输入命令向所有客户端广播消息
    {
        if (input[2..] == "1001")
        {
            var playerMsg = new PlayerMessage()
            {
                PlayerId = 1,
                Name     = "张三",
                Atk      = 100,
                Lev      = 10
            };
            serverSocket.Broadcast(playerMsg);
        }
    }
}
  1. ServerSocket.cs
// ------------------------------------------------------------
// @file       ServerSocket.cs
// @brief
// @author     zheliku
// @Modified   2025-03-24 02:03:06
// @Copyright  Copyright (c) 2025, zheliku
// ------------------------------------------------------------

namespace NetLearningTcpServerExercise2;

using System.Net;
using System.Net.Sockets;

public class ServerSocket
{
    private readonly Socket _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    private Dictionary<int, ClientSocket> _clientSockets = new Dictionary<int, ClientSocket>();

    private bool _Running;

    public void Start(string ip, int port, int maxClientCount)
    {
        _socket.Bind(new IPEndPoint(IPAddress.Parse(ip), port));
        _socket.Listen(maxClientCount);
        _Running = true;

        ThreadPool.QueueUserWorkItem(AcceptClient);
        ThreadPool.QueueUserWorkItem(ReceiveMessage);
    }

    public void Close()
    {
        _Running = false;
        foreach (var clientSocket in _clientSockets.Values)
        {
            clientSocket.Close();
        }
        _clientSockets.Clear();

        try
        {
            _socket.Shutdown(SocketShutdown.Both);
        }
        catch { }
        finally
        {
            _socket.Close();
        }
    }

    public void Broadcast(INetMessage message)
    {
        foreach (var clientSocket in _clientSockets.Values)
        {
            clientSocket.SendMessage(message);
        }
    }

    private void AcceptClient(object? state)
    {
        while (_Running)
        {
            try
            {
                // 连入客户端
                var clientSocket = new ClientSocket(_socket.Accept());

                // clientSocket.SendMessage("Welcome to the server!");

                _clientSockets.Add(clientSocket.Id, clientSocket);
            }
            catch (Exception e)
            {
                Console.WriteLine("ClientSocket Accept Wrong: " + e);
            }
        }
    }

    private void ReceiveMessage(object? state)
    {
        while (_Running)
        {
            if (_clientSockets.Count > 0)
            {
                foreach (var clientSocket in _clientSockets.Values)
                {
                    clientSocket.ReceiveMessage();
                }
            }
        }
    }
}
  1. ClientSocket.cs
// ------------------------------------------------------------
// @file       ClientSocket.cs
// @brief
// @author     zheliku
// @Modified   2025-03-24 01:03:42
// @Copyright  Copyright (c) 2025, zheliku
// ------------------------------------------------------------

namespace NetLearningTcpServerExercise2;

using System.Net.Sockets;
using System.Text;

public class ClientSocket
{
    private static int _ClientBeginId = 1;

    private readonly Socket _socket;

    private byte[] _cacheBytes = new byte[1024 * 1024]; // 缓冲区,大小为 1MB
    private int    _cacheBytesLength;

    public int Id;

    public bool Connected
    {
        get => _socket.Connected;
    }

    public ClientSocket(Socket socket)
    {
        Id      = _ClientBeginId++;
        _socket = socket;
    }

    public void Close()
    {
        try
        {
            _socket.Shutdown(SocketShutdown.Both);
        }
        catch { }
        finally
        {
            _socket.Close();
        }
    }

    public void SendMessage(INetMessage message)
    {
        try
        {
            _socket.Send(message.ToBytes());
        }
        catch (Exception e)
        {
            Console.WriteLine("SendMessage Wrong: " + e);
            Close();
        }
    }

    public void ReceiveMessage()
    {
        try
        {
            if (_socket.Available > 0)
            {
                var buffer        = new byte[1024 * 5];
                var receiveLength = _socket.Receive(buffer);
                HandleReceiveMessage(buffer, receiveLength);
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("ReceiveMessage Wrong: " + e);
            Close();
        }
    }

    private void MessageHandle(object? state)
    {
        if (state == null) return;

        Console.WriteLine($"Receive message from client {_socket} (ID {Id}): {state}");
    }

    private void HandleReceiveMessage(byte[] receiveBytes, int receiveNum)
    {
        var messageId = 0;
        var index     = 0;

        // 收到消息时看之前有没有缓存
        // 如果有,直接拼接到后面
        receiveBytes.CopyTo(_cacheBytes, _cacheBytesLength);
        _cacheBytesLength += receiveNum;

        while (true)
        {
            var messageLength = -1;

            // 处理前置信息
            if (_cacheBytesLength - index >= 8)
            {
                // 解析 Id
                messageId =  BitConverter.ToInt32(_cacheBytes, index);
                index     += sizeof(int);

                // 解析长度
                messageLength =  BitConverter.ToInt32(_cacheBytes, index);
                index         += sizeof(int);
            }

            // 处理消息体
            if (messageLength != -1 && _cacheBytesLength - index >= messageLength)
            {
                // 解析消息体
                INetMessage message = default;
                switch (messageId)
                {
                    case 1001:
                        message = new PlayerMessage();
                        message.FromBytes(_cacheBytes, index);
                        break;
                }

                if (message != default)
                {
                    ThreadPool.QueueUserWorkItem(MessageHandle, message);
                }
                index += messageLength;

                // 如果消息体长度等于缓存长度,证明缓存已经处理完毕
                if (index == _cacheBytesLength)
                {
                    _cacheBytesLength = 0;
                    break;
                }
            }
            else // 消息体还没有接收完毕
            {
                // 解析了前置信息,但是没有成功解析消息体
                if (messageLength != -1)
                {
                    index -= 8; // 回退到解析 Id 的位置
                }

                // 缓存剩余的数据
                _cacheBytesLength -= index;
                Array.Copy(_cacheBytes, index, _cacheBytes, 0, _cacheBytesLength);

                break;
            }
        }
    }
}

3.2 客户端

​ 在 Unity 中创建场景,4 个按钮分别发送不同消息。

image-20250404190900044

​ 编写 Lesson7.cs 测试脚本,挂载到 Lesson7 场景物体上。

​ “发送”、“黏包”、“分包”、“分包、黏包”按钮分别挂载到 Lesson7 的 BtnSendBtnSend1BtnSend2BtnSend3 成员上。

// ------------------------------------------------------------
// @file       Lesson7.cs
// @brief
// @author     zheliku
// @Modified   2025-03-24 02:03:40
// @Copyright  Copyright (c) 2025, zheliku
// ------------------------------------------------------------

namespace Lesson
{
    using System;
    using System.Threading.Tasks;
    using UnityEngine;
    using UnityEngine.UI;

    public class Lesson7 : MonoBehaviour
    {
        public Button BtnSend;
        public Button BtnSend1;
        public Button BtnSend2;
        public Button BtnSend3;

        private void Start()
        {
            NetManager.Instance.Connect("127.0.0.1", 8080);

            BtnSend.onClick.AddListener(() =>
            {
                ...
            });

            BtnSend1.onClick.AddListener(() =>
            {
                ...
            });
            
            BtnSend2.onClick.AddListener(async () =>
            {
                ...
            });
            
            BtnSend3.onClick.AddListener(async () =>
            {
                ...
            });
        }
    }
}

3.3 直接发送

​ 启动服务端,再启动 Unity。直接发送的逻辑如下:

BtnSend.onClick.AddListener(() =>
{
    var playerMsg = new PlayerMessage()
    {
        PlayerId = 1001,
        Name     = "发送",
        Atk      = 100,
        Lev      = 1
    };
    NetManager.Instance.Send(playerMsg);
});

​ 点击 Unity 中的“发送”按钮,服务器端接收到消息。

image-20250404191338724

3.4 黏包发送

image-20250404192804164

​ 黏包发送定义了两条消息,顺序存放在一个长字节数组中一并发送。具体逻辑如下:

BtnSend1.onClick.AddListener(() =>
{
    var playerMsg1 = new PlayerMessage()
    {
        PlayerId = 1002,
        Name     = "黏包1",
        Atk      = 100,
        Lev      = 1
    };
    var playerMsg2 = new PlayerMessage()
    {
        PlayerId = 1003,
        Name     = "黏包2",
        Atk      = 63,
        Lev      = 5
    };

    var bytes = new byte[playerMsg1.BytesLength + playerMsg2.BytesLength];
    playerMsg1.ToBytes().CopyTo(bytes, 0);
    playerMsg2.ToBytes().CopyTo(bytes, playerMsg1.BytesLength);
    NetManager.Instance.SendTest(bytes);
});

​ 点击 Unity 中的“黏包”按钮,服务器端一次性接收到 2 条消息。

image-20250404191754725

3.5 分包发送

image-20250404192814792

​ 分包发送定义了 1 条消息,将前 10 个字节拷贝到一个数组中,剩余字节拷贝到另一个数组中。两个数组间隔 0.5s 发送。具体逻辑如下:

BtnSend2.onClick.AddListener(async () =>
{
    var playerMsg = new PlayerMessage()
    {
        PlayerId = 1004,
        Name     = "分包",
        Atk      = 100,
        Lev      = 1
    };

    var bytes  = playerMsg.ToBytes();
    var bytes1 = new byte[10];
    var bytes2 = new byte[bytes.Length - 10];
    Array.Copy(bytes, 0, bytes1, 0, 10);
    Array.Copy(bytes, 10, bytes2, 0, bytes.Length - 10);

    NetManager.Instance.SendTest(bytes1);
    await Task.Delay(500); // 注释改行后会自动黏包,服务端立刻接收到消息
    NetManager.Instance.SendTest(bytes2);
});

​ 点击 Unity 中的“分包”按钮,服务器端等待 0.5s 后,才接收到消息。

image-20250404192025667

3.6 分包、黏包发送

image-20250404192824995

​ 分包、黏包发送定义了 2 条消息,将消息 1 和消息 2 的前 10 个字节拷贝到一个数组中,消息 2 剩余字节拷贝到另一个数组中。两个数组间隔 0.5s 发送。具体逻辑如下:

BtnSend3.onClick.AddListener(async () =>
{
    var playerMsg1 = new PlayerMessage()
    {
        PlayerId = 1005,
        Name     = "分包黏包1",
        Atk      = 9,
        Lev      = 1
    };
    var playerMsg2 = new PlayerMessage()
    {
        PlayerId = 1006,
        Name     = "分包黏包2",
        Atk      = 63,
        Lev      = 55
    };
    var bytes1 = playerMsg1.ToBytes();
    var bytes2 = playerMsg2.ToBytes();

    var bytes2_1 = new byte[10];
    var bytes2_2 = new byte[bytes2.Length - 10];
    Array.Copy(bytes2, 0, bytes2_1, 0, 10);
    Array.Copy(bytes2, 10, bytes2_2, 0, bytes2.Length - 10);

    var firstBytes = new byte[bytes1.Length + bytes2_1.Length];
    Array.Copy(bytes1, 0, firstBytes, 0, bytes1.Length);
    Array.Copy(bytes2_1, 0, firstBytes, bytes1.Length, bytes2_1.Length);

    var secondBytes = bytes2_2;

    NetManager.Instance.SendTest(firstBytes);
    await Task.Delay(500); // 注释改行后会自动黏包,服务端立刻接收到消息
    NetManager.Instance.SendTest(secondBytes);
});

​ 点击 Unity 中的“分包、黏包”按钮,服务器端立刻接收到消息 1。

image-20250404192344679

​ 等待 0.5s 后,接收到消息 2。

image-20250404192411217

3.7 其他

​ 在服务端输入 “B:1001” 命令后,回到 Unity,发现接收到消息。

image-20250404192625942 image-20250404192650849