【网络编程】简易的 p2p 模型,实现两台虚拟机之间的简单点对点通信,并以小见大观察 TCP 协议的具体运行

发布于:2025-07-10 ⋅ 阅读:(26) ⋅ 点赞:(0)

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接

基本概念

p2p, 一个让所有投资者脊背发凉的金融概念, 一个去中心化的金融概念,其实它的本质只是一个技术。P2P(Peer-to-Peer)是一种去中心化的网络架构模式,中文通常翻译为"点对点"或"对等网络"。它代表了与传统的客户端-服务器(Client-Server)模型完全不同的网络通信理念。

P2P 的核心概念

  • 去中心化:
    • 没有中央服务器控制整个网络
    • 所有参与者(节点)地位平等
    • 每个节点既是客户端又是服务器(称为"对等体")
  • 直接通信:
    • 节点之间直接连接和交换数据
    • 不需要通过中间服务器中转
    • 通信路径更短,延迟更低
  • 资源共享:
    • 每个节点贡献自己的资源(带宽、存储、计算能力)
    • 资源分布在整个网络中
    • 节点越多,网络整体能力越强

与传统客户端-服务器模型的对比

特性 P2P 网络 客户端-服务器模型
架构 去中心化 中心化
节点角色 既是客户端又是服务器 严格区分客户端和服务器
扩展性 节点越多性能越好 服务器可能成为瓶颈
可靠性 单点故障不影响整个网络 服务器故障导致服务中断
资源分布 资源分散在各个节点 资源集中在服务器

P2P 的典型应用场景

  • 文件共享:
    • BitTorrent:用户直接从其他用户下载文件片段
    • 早期Napster:音乐文件共享(混合式P2P)
  • 加密货币:
    • 比特币/以太坊:交易验证通过P2P网络完成
    • 区块链技术的基础架构
  • 即时通讯:
    • 早期Skype:语音通话直接在对等体间建立
    • 某些隐私通讯应用
  • 内容分发:
    • P2P CDN:利用用户设备分发内容
    • 直播平台的P2P加速

我所能设想到的一个应用就是 “智能家具” 的设计,我们用手机与智能家居进行点对点的 P2P 连接,直接下命令,而非绕一大圈地经过中央服务器。这样的设计才是系统开销小,用户体验好。

业务拆解

我们在前面的基本概念介绍里面已经说到过 P2P 网络的各个节点既是客户端又是服务器,本篇文章之中,我们要抓住这一个点设计一个点对点通信的简易代码。至于像加密验证等 “高级玩意”,本篇文章是绝对不会涉及的。

问题来了,我们该怎么设计呢?我们可以尝试一下问自己,到底想要什么功能效果。我问过自己,可以分成两大类——主动类和被动类。

主动类的功能:

  1. 用户之间随时发起信息。
  2. 用户选择想要连接的对象 IP(可以重置 IP)。
  3. 自己是一个客户端,可主动发起并实现与对应 IP 的远程连接。
  4. 结束程序。

被动类的功能:

  1. 自己本身是服务器,被动监听到来访 IP,并随即分配套接字资源负责对应的 I/O 任务。
  2. 自动地接收信息。(这里回想起《角头》中白毛对 “憨春” 说:“憨春大,我 BOSS 找你那么多次,你为什么都已读不回呀?啊?”)

为了简化问题,本篇文章所展示的代码,只实现 “一个设备仅有一个连接,如果想要新的连接就必须删掉旧的连接” 的设计。

对于主动类的功能,我将采用 “用户界面” 式的循环交互设计,类似的代码可见我之前写过的一篇关于 “通讯录小项目” 的文章(原文链接 在此)。

对于被动类的功能,我将采用多线程编程的设计。有两个被动类的功能,那就有两个子线程分别负责。这两个子线程因 “一个设备仅有一个连接,如果想要新的连接就必须删掉旧的连接” 的简化,而使用了 “SELECT” 定时关注连接所对应的套接字是否对接收、读写等事件就绪。select 是一种多路复用(multiplexing)I/O 机制,用于同时监视多个文件描述符(file descriptors),以确定哪些文件描述符已经准备好进行 I/O 操作(如读取、写入或异常条件)。类似的代码可见我之前写过的一篇关于多路 I/O 复用的文章(原文链接 在此)。

为了确保程序能够被正常关闭,所建立套接字都是非阻塞的,即定时执行,重复循环计时。

代码实现

准备工作

准备头文件

#include <stdio.h>
#include <stdlib.h>     //  EXIT_FAILURE 是一个标准宏,exit 函数
#include <string.h>
#include <unistd.h>     //  close 函数
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h> // 添加select用于超时处理
#include <fcntl.h>				// 用于更改套接字的模式,比如非阻塞模式
#include <errno.h>				// 这是全局变量 errno,用于健壮的读取功能

准备宏定义

#define MAX_MSG_LEN 1024
#define PORT 5000

声明与定义全局变量,该全局变量能综合、集成所有的运行参数。我们将之命名为 NodeState

// 全局状态结构体
typedef struct {
    int server_fd;
    int connection_fd;
    int running;
    pthread_mutex_t lock;
    char peer_ip[16];  // 存储点分十进制IP地址
    int peer_port;
} NodeState;

// 声明并定义全局变量
// 点号(.)在这里是C99标准引入的指定初始化器语法的一部分。它的作用是明确指定结构体成员的初始化值,而不是依赖于成员在结构体中的顺序。
NodeState node_state = {
    .server_fd = -1,
    .connection_fd = -1,
    .running = 1,
    .lock = PTHREAD_MUTEX_INITIALIZER,
    .peer_ip = "",
    .peer_port = PORT
};

需要注意到的是,点号(.)在这里是C99标准引入的指定初始化器语法的一部分。它的作用是明确指定结构体成员的初始化值,而不是依赖于成员在结构体中的顺序。

紧接着是错误处理函数,

void error(const char *msg) {
    perror(msg);
    exit(EXIT_FAILURE);     //  EXIT_FAILURE 是一个标准宏,定义在 <stdlib.h> 中,用于表示程序执行失败。
    // 当 exit 函数被调用时,程序会执行以下操作:
    // 关闭所有打开的文件:关闭所有通过标准 I/O 函数(如 fopen)打开的文件流。
    // 刷新缓冲区:刷新所有标准 I/O 缓冲区,确保所有未写入的数据都被写入目标文件或设备。
    // 调用清理函数:执行所有通过 atexit 注册的清理函数(如果有)。
    // 终止程序:终止程序的执行,并将 status 参数作为退出状态码返回给操作系统。
}

当我们结束程序的时候,需要定义清理资源函数

// 清理资源
void cleanup() {
    pthread_mutex_lock(&node_state.lock);
    if (node_state.connection_fd != -1) {
        close(node_state.connection_fd);
        node_state.connection_fd = -1;  //  重置
    }
    if (node_state.server_fd != -1) {
        close(node_state.server_fd);
        node_state.server_fd = -1;  //  重置
    }
    pthread_mutex_unlock(&node_state.lock);
    printf("[*] Resources cleaned up\n");

    return;
}

为了能让程序正常结束,而非让套接字对应的 acceptrecv 函数在用户选择退出的时候,一直处于阻塞各自的线程之中,故而我们定义了套接字设置函数

// 设置套接字为非阻塞
void set_nonblocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        return;
    }
    if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL");
    }
}

实现被动的功能——多线程指针函数

这个函数是回调函数,它即将被线程所调用。他主要是调用了 select 多路复用机制,过一段时间就会检查对应的套接字是否关于读写事件就绪,而且还设置了超时机制,这是因为所有的套接字都被设置成了非阻塞模式。另外,之所以不用 EPOLL 机制,是因为该线程只针对一个连接(一个套接字),因此 select 的效果会更好。我们的智能家居环境不也只是零星几个连接?并不是一个服务器,因而并不需要那么多的连接。

// 服务器线程函数
void *server_thread(void *arg) {
    struct sockaddr_in address;
    int addrlen = sizeof(address);
    
    // 创建服务器套接字
    node_state.server_fd = socket(AF_INET, SOCK_STREAM, 0);
    
    // 设置套接字为非阻塞
    set_nonblocking(node_state.server_fd);

    // 设置套接字选项 (允许地址重用)
    int opt = 1;
    if (setsockopt(node_state.server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        error("setsockopt failed");
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定套接字
    if (bind(node_state.server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        error("bind failed");
    }
    
    // 监听连接
    if (listen(node_state.server_fd, 3) < 0) {
        error("listen failed");
    }
    
    printf("[*] Server listening on port %d\n", PORT);
    
    while (node_state.running) {

        fd_set read_fds;
        FD_ZERO(&read_fds);
        FD_SET(node_state.server_fd, &read_fds);
        
        // 设置超时时间为1秒
        struct timeval timeout;
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
        
        // 使用select等待连接请求或超时
        int activity = select(node_state.server_fd + 1, &read_fds, NULL, NULL, &timeout);
  
        // 检查是否有新连接
        if (activity > 0 && FD_ISSET(node_state.server_fd, &read_fds)) {
            int new_socket;
            if ((new_socket = accept(node_state.server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
                if (errno != EWOULDBLOCK && errno != EAGAIN) {
                    perror("accept failed");
                }
                continue;
            }

            pthread_mutex_lock(&node_state.lock);
            
            // 检查是否已有连接
            if (node_state.connection_fd != -1) {
                printf("[!] Connection already exists. Closing new connection.\n");
                close(new_socket);
            } else {
                node_state.connection_fd = new_socket;
                printf("[+] Accepted connection from %s\n", inet_ntoa(address.sin_addr));
            }
            
            pthread_mutex_unlock(&node_state.lock);
        }
        
        // 检查是否需要退出
        if (!node_state.running) {
            break;
        }
    }
    
    return NULL;

}

这个函数是回调函数,它即将被线程所调用。设计的道理类似于上面。

// 接收消息线程函数
void *recv_thread(void *arg) {
    char buffer[MAX_MSG_LEN] = {0};
    
    while (node_state.running) {
        pthread_mutex_lock(&node_state.lock);
        int current_fd = node_state.connection_fd;
        pthread_mutex_unlock(&node_state.lock);
        
        if (current_fd == -1) {
            // 没有连接时短暂休眠
            usleep(100000); // 100ms
            continue;
        }
        
        fd_set read_fds;
        FD_ZERO(&read_fds);
        FD_SET(current_fd, &read_fds);
        
        // 设置超时时间为1秒
        struct timeval timeout;
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
        
        // 使用select等待数据或超时
        int activity = select(current_fd + 1, &read_fds, NULL, NULL, &timeout);
        
        if (activity < 0 && errno != EINTR) {
            perror("select error");
            continue;
        }
        
        // 检查是否有数据到达
        if (activity > 0 && FD_ISSET(current_fd, &read_fds)) {
            // 接收数据
            ssize_t valread = recv(current_fd, buffer, MAX_MSG_LEN - 1, 0);
            if (valread <= 0) {
                if (valread == 0) {
                    printf("[!] Connection closed by peer\n");
                } else if (node_state.running) {
                    perror("recv failed");
                }
                
                pthread_mutex_lock(&node_state.lock);
                if (node_state.connection_fd == current_fd) {
                    close(current_fd);
                    node_state.connection_fd = -1;
                }
                pthread_mutex_unlock(&node_state.lock);
                
                printf("[*] Ready for new connections\n");
            } else {
                buffer[valread] = '\0';
                printf("\n[Peer] %s\nYou: ", buffer);
                fflush(stdout);
            }
        }
        
        // 检查是否需要退出
        if (!node_state.running) {
            break;
        }
    }
    
    return NULL;
}

实现主动的功能——用户选择界面

用户可以主动的发起远程连接,就像客户端一样。

// 连接到对等节点
void connect_to_peer() {
    
    if (strlen(node_state.peer_ip) == 0) {
        printf("[!] Peer IP not set\n");
        return;
    }
    
    struct sockaddr_in serv_addr;
    int sock = 0;
    
    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        error("socket creation failed");
    }
    
    // 设置套接字为非阻塞
    set_nonblocking(sock);
    
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(node_state.peer_port);
    
    // 转换IP地址
    if (inet_pton(AF_INET, node_state.peer_ip, &serv_addr.sin_addr) <= 0) {
        printf("[!] Invalid address/ Address not supported\n");
        close(sock);
        return;
    }
    
    printf("[*] Trying to connect to %s:%d...\n", node_state.peer_ip, node_state.peer_port);
    
    // 尝试连接(非阻塞)
    int connect_result = connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
    if (connect_result < 0 && errno != EINPROGRESS) {
        perror("Connection failed");
        close(sock);
        return;
    }
    
    // 使用 select 检查连接状态
    fd_set write_fds;
    FD_ZERO(&write_fds);
    FD_SET(sock, &write_fds);
    
    struct timeval timeout;
    timeout.tv_sec = 5; // 5秒超时
    timeout.tv_usec = 0;
    
    int sel = select(sock + 1, NULL, &write_fds, NULL, &timeout);
    if (sel <= 0) {
        printf("[-] Connection timed out\n");
        close(sock);
        return;
    }
    
    // 检查套接字错误
    int so_error;
    socklen_t len = sizeof(so_error);
    getsockopt(sock, SOL_SOCKET, SO_ERROR, &so_error, &len);
    if (so_error != 0) {
        printf("[-] Connection failed: %s\n", strerror(so_error));
        close(sock);
        return;
    }
    
    pthread_mutex_lock(&node_state.lock);
    node_state.connection_fd = sock;
    pthread_mutex_unlock(&node_state.lock);
    
    printf("[+] Connected to peer %s:%d\n", node_state.peer_ip, node_state.peer_port);
}

连接建立后,双方可以主动发送信息,可以像网络喷子一样 “对喷”

// 发送消息
void send_message(const char *message) {
    pthread_mutex_lock(&node_state.lock);
    int current_fd = node_state.connection_fd;
    pthread_mutex_unlock(&node_state.lock);
    
    if (current_fd == -1) {
        printf("[!] Not connected to any peer\n");
        return;
    }
    
    if (send(current_fd, message, strlen(message), 0) < 0) {
        perror("send failed");
        
        pthread_mutex_lock(&node_state.lock);
        if (node_state.connection_fd == current_fd) {
            close(current_fd);
            node_state.connection_fd = -1;
        }
        pthread_mutex_unlock(&node_state.lock);
    }
}

综合前面两个函数,我们顺带还给交流的双方提供 “喷不过就绝交挂线” 的功能,这就是用户选择互动界面函数。用户想要发送信息,必须是 3 -> 2 -> 1 的执行顺序,即 “3” 先锁定目标,“2” 对目标进行远程连接,“1” 在实现连接后可以互发消息。如果我们想要换一个主机连接。我们可以继续 “3”,重置远程 IP,并断开原来的连接,清理套接字资源。实在不想玩,可以直接按 “4”,退出。

// 用户交互界面,主线程
//  程序员如想发消息,应先 3 -> 2 -> 1
void user_interface() {
    char input[100];
    char message[MAX_MSG_LEN];
    
    while (node_state.running) {
        printf("\nOptions:\n1. Send message\n2. Connect to peer\n3. Set peer IP\n4. Exit\nChoose: ");
        fflush(stdout);
        
        if (fgets(input, sizeof(input), stdin) == NULL) {   //  程序员从键盘中输入功能键
            break;
        }
        
        switch (input[0]) {
            case '1': // 发送消息
                printf("Enter message: ");
                fflush(stdout);
                if (fgets(message, MAX_MSG_LEN, stdin)) {   //  程序员从键盘中输入聊天信息
                    // 移除换行符
                    message[strcspn(message, "\n")] = 0;    //  把输入的聊天信息最后的 “\n” 精准转变为 “\0”,以便打印和发送
                    send_message(message);
                }
                break;
                
            case '2': // 连接到对等节点
                
                connect_to_peer();
                break;
                
            case '3': // 设置对等节点IP
            
                printf("Enter peer IP: ");
                fflush(stdout);
                if (fgets(node_state.peer_ip, sizeof(node_state.peer_ip), stdin)) {
                    // strcspn 是 C 语言标准库中的一个字符串处理函数,用于计算一个字符串中不包含指定字符集合的第一个子串的长度。

                    // 移除换行符
                    node_state.peer_ip[strcspn(node_state.peer_ip, "\n")] = 0;
                    printf("Peer IP set to: %s\n", node_state.peer_ip);
                }

                if (node_state.connection_fd != -1) {
                    printf("[!] Already connected to a peer. Now change anothor peer to connect.\n");
                    close(node_state.connection_fd);
                }
                break;
                
            case '4': // 退出
                node_state.running = 0;
                printf("[*] Shutting down...\n");
                break;
                
            default:
                printf("Invalid option\n");
        }
    }
    return;
}

主函数

实现被动的功能需要多线程编程,pthread_create 调用 server_threadrecv_thread 两个指针函数(回调函数),子线程的执行是需要线程来调度的(也就是全自动的)。在建立了两个子线程后,我们可以执行用户界面函数,实现发消息等 “主动的功能”。当选择结束程序后,要及时回收线程资源。

int main() {
    printf("=== P2P Node ===\n");
    
    // 创建服务器线程
    pthread_t server_tid, recv_tid;
    if (pthread_create(&server_tid, NULL, server_thread, NULL)) {       //  accept 会挂起
        error("could not create server thread");
    }
    
    // 创建接收消息线程
    if (pthread_create(&recv_tid, NULL, recv_thread, NULL)) {           //  recv 会挂起,如果没链接就会睡眠
        error("could not create receive thread");
    }

    //  以上两个线程都是在没有监听到新的来访 IP、没有人发来消息时,循环执行超时等待而后睡眠,同时还负责检查服务器的连接状态,如果断了就继续连接
    
    // 设置对等节点IP (初始为空)
    strcpy(node_state.peer_ip, "");
    
    // 启动用户界面
    user_interface();   //  至于主动发消息,关闭程序的,建立连接的 “主动项目” 就有用户界面统一管理。
    
    // 清理资源
    cleanup();      //  当程序员选择了 “4”,就会进入清理资源的函数处,线程就没有什么函数任务可执行了,紧接着就是关闭 套接字 资源
    
    // 等待线程结束
    // 虽然线程的任务函数已经跳出了死循环,但线程资源本身还未释放
    pthread_join(server_tid, NULL);     
    pthread_join(recv_tid, NULL);
    
    printf("[*] Program exited\n");
    return 0;
}

代码执行效果

编译代码

qiming@qiming:~/share/CTASK/TCP_test$ gcc -o p2p p2p_test.c -lpthread

在两台虚拟机上执行代码

qiming@qiming:~/share/CTASK/TCP_test$ ./p2p 
=== P2P Node ===

Options:
1. Send message
2. Connect to peer
3. Set peer IP
4. Exit
Choose: [*] Server listening on port 5000

我是使用 Xshell 去远程运行多台虚拟机
在这里插入图片描述
具体界面如下
在这里插入图片描述
先选择 “3” 选项,锁定要远程连接的对象
在这里插入图片描述
再选 “2” 选项,对设定好的 IP 进行连接
在这里插入图片描述
连接好后,选择 “1” 选项,双方可以对线互喷了。
在这里插入图片描述

选择 “4” 则会退出程序。
在这里插入图片描述

意外收获

如果我们在上述的程序执行过程中,利用 WireShark 软件去记录一路上的网络传输发包情况。我们将会触摸到 TCP 协议层——传输层的具体物体——包(处于第四层,对应 TCP 的报文),这些报文的制作都来自底层库所定义的函数 acceptrecvsendclose,我们是可以不用关心报文的制作,都有内核底层为我们代劳了。但我们依旧要学习这些底层,因为这能增强我们的计算机工程感觉。
在这里插入图片描述
我仔细地在网络抓包工具目录条,里面抓到了三个包——它们其实就是 “三次握手”
在这里插入图片描述
相互发消息,发的对称两个包
在这里插入图片描述
而后断开连接产生的四次挥手,对应四个包
在这里插入图片描述
以及
在这里插入图片描述

总结

我们首先写了一个极简的 P2P 网络通信,以同一局域网内的两台虚拟机作为实验对象,在过程之中,我们还以小见大,利用 Wireshark 去捕捉一整个程序执行的过程,从而亲身体会到 TCP 协议的具体传输过程。


网站公告

今日签到

点亮在社区的每一天
去签到