p2p sdk接收文件服务端epoll模型源码

发布于:2022-12-22 ⋅ 阅读:(166) ⋅ 点赞:(0)

#  KKP2P SDK介绍
kkp2p sdk是库快科技(kkuai.com)研发的支持p2p通信的中间件,是一套与业务无关的通用的p2p sdk库, 可以免费下载试用版本使用。
一句话概括其特点:支持面向登录账号进行网络编程。
即只要传入对端的账号id,应用层就能得到一个socket句柄fd,应用层通过读写该socket fd句柄和对端进行通信。

优秀特性 说明
跨平台 kkp2p的sdk库是由c语言开发,在linux、windows、android、ios等平台编译出了静态库,以及其他一些嵌入式平台也编译出了静态库,大家可以直接下载进行使用。云端服务是由golang语言开发,也支持在各种平台下编译出直接可以运行的程序,配置也比较简单,大家下载之后就可以按照官网文档说明自行进行部署。
体积小 kkp2p不依赖于任何第三方库,编译出来的库只有500KB左右大小。
性能强 在服务器上测试,P2P方式通信的速度可以超过10MB每秒;中转方式通信的速度取决于您云端服务器的带宽
易使用 提供了类似于socket编程接口的kkp2p_connect、kkp2p_listen、kkp2p_accept、kkp2p_read、kkp2p_write几个核心函数,使用起来非常简单方便。您只要指定对端的登录账号通过kkp2p_connect函数就能和对端创建一个虚拟的传输管道,然后通过kkp2p_read和kkp2p_write函数来读写数据和对端进行通信。您还可以通过参数指定是使用P2P方式通信还是使用中转(relay)方式通信,完全不用关心底层传输通道的创建和管理细节,一切由kkp2p的sdk库帮您解决。
高安全 支持加密通道传输,您只需要在kkp2p_connect的参数中指定需要加密数据即可。sdk会自动创建一个加密的虚拟通信管道出来;您写入明文,sdk会自动加密成密文传输;sdk收到密文,会自动解密成明文返回给您。通信双方的共同密钥是双方的sdk通过DH算法自动协商而成,外界无法获取;并且每次sdk的启动都会自动协商生成一个新的动态密钥,严格保障您的通信数据的安全。如果您为了提升数据传输的性能,不想对数据进行加解密,只需要在创建连接的函数kkp2p_connect参数中指定不需要加密数据即可。sdk的数据加解密功能只有在商业版本中才有,在个人试用版本中没有该功能。
通用性 kkp2p是一套适用于各种场景的通用的通信中间件,会完全透传用户的数据,您可以灵活的自定义通信双方的协议,kkp2p不会解析您的业务数据。kkp2p的P2P通信是基于udp实现的,kkp2p会自动帮您解决丢包、乱序、重传问题,也会根据您的实际带宽做自适应的带宽流控,您使用起来具有tcp传输的效果,相当于是用udp模拟实现了tcp。中转(relay)通信是基于tcp原生实现的。

#  接收文件服务端源码

接收文件过程:首先接收4字节的文件大小(网络字节序,暂支持4G左右大小,如果要超过4G,则协议需要使用8字节大小),然后再接收文件内容;接收完成后,发送响应,最后在退出。

 服务端可以并行接收多个客户端发送过来的文件,虽然是个单线程,但是通过epoll实现了并发接收,读者需要有epoll背景知识,该代码在linux平台编译运行验证通过。

我们可以看到,p2p sdk的用法非常类似原生的网络编程用法,非常简单易用。

#  源码讲解

#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>

#include <map>

// 去kkuai.com获取
// 得包含kkp2p sdk的头文件,以及链接libkkp2p.a
#include "kkp2p_sdk.h"

using namespace std;

// 对应每个接收文件的信息
typedef struct kkp2p_fd_ctx_s {
    int fd;
    kkp2p_channel_t* channel;
    int fileSize;
    int recved;
    FILE* pWrite;
}kkp2p_fd_ctx_t;

// 简单封装一下发送指定长度数据接口
int SendData(int fd, char* buff, int len) {
    int sended = 0 ;
    while (sended < len) {
        int wl = kkp2p_write(fd, buff + sended, len - sended, 1000);
        if (wl < 0) {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

// 简单封装一下读取指定长度数据接口
int RecvData(int fd, char* buff, int len) {
    int recved = 0 ;
    while (recved < len) {
        int wl = kkp2p_read(fd, buff + recved, len - recved, 1000);
        if (wl < 0) {
            printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
            return -1;
        }
        recved += wl;
    }
    return len;
}

// epoll可读事件的处理函数,接收文件内容
int OnRecvFile(int fd, map<int, kkp2p_fd_ctx_t>& fileFds, int epollFd, kkp2p_engine_t* p2pEngine) {
    map<int, kkp2p_fd_ctx_t>::iterator iter = fileFds.find(fd);
    if (iter == fileFds.end()) {
        return -1;
    }

    kkp2p_fd_ctx_t& ctx = fileFds[fd];
    if (ctx.fileSize == 0) {
        // 首先接收4个字节的文件长度
        int ret = RecvData(fd, (char*)&ctx.fileSize, sizeof(ctx.fileSize));
        if (ret < 0 ) {
            return -1;
        }
        ctx.fileSize = ntohl(ctx.fileSize);
    } else {
        // 接收文件内容并写文件
        char szBuff[1024];
        int recved = kkp2p_read(fd, szBuff, sizeof(szBuff), 0);
        if (recved < 0) {
            printf("kkp2p_read error,fd:%d,ret:%d,errno:%d,desc:%s.\n",fd, recved, errno, strerror(errno));
            return -1;
        }
        ctx.recved += recved;
        fwrite(szBuff, 1, recved, ctx.pWrite);
        if (ctx.recved >= ctx.fileSize) {
            // 如果接收完成则发送响应
            char ch = '1';
            SendData(fd, &ch, 1);

            //将fd对应事件从epoll删除
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = 0;
            epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, &ev);

            fflush(ctx.pWrite);
            fclose(ctx.pWrite);
            
            // 关闭连接的fd句柄以及连接,并退出
            kkp2p_close_fd(fd);
            kkp2p_close_channel(p2pEngine, ctx.channel->channel_id);
            free(ctx.channel);
            fileFds.erase(iter);
            printf("recv file success,file size:%u,fd:%d,channel id:%u.\n",ctx.fileSize, fd, ctx.channel->channel_id);
        }
    }
    return 0;
}
// socket异常事件
int OnRecvError(int fd, map<int, kkp2p_fd_ctx_t>& fileFds, int epollFd, kkp2p_engine_t* p2pEngine) {
    map<int, kkp2p_fd_ctx_t>::iterator iter = fileFds.find(fd);
    if (iter == fileFds.end()) {
        return -1;
    }

    kkp2p_fd_ctx_t& ctx = fileFds[fd];
    if (ctx.fileSize > ctx.recved) {
        printf("recv file error,file size:%u,receved:%u,fd:%d,channel id:%u.\n",ctx.fileSize, ctx.recved, fd,ctx.channel->channel_id);
    }
    //将socket句柄从epoll删除,不再侦听该socket的事件
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = 0;
    epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, &ev);
    fclose(ctx.pWrite);
    kkp2p_close_fd(fd);
    kkp2p_close_channel(p2pEngine, ctx.channel->channel_id);
    free(ctx.channel);
    fileFds.erase(iter);
    return 0;
}


int main(int argc, char** argv)
{
    // 输入两个参数,第一个是peerId,即登录账号,第二个是账号对应的密钥,云端禁止非法账号登录
    if (argc < 3) {
        printf("usage:%s peerId peerKey\n",argv[0]);
        return -1;
    }

    char* peerId = argv[1];
    char* peerKey = argv[2];
    // 用于保存每个文件的上下文信息
    map<int, kkp2p_fd_ctx_t> fileFds;

    // 初始化 kkp2p sdk 
    // 包括云端登录域名端口,局域网端口,日志配置
    kkp2p_engine_conf_t kkp2p_conf;
    kkp2p_conf.login_domain = "125.72.218.199";
    kkp2p_conf.login_port = 3080;
    kkp2p_conf.lan_search_port = 3549;
    kkp2p_conf.max_log_size = 1024*1024*10;
    kkp2p_conf.log_path = NULL;
    kkp2p_engine_t* p2pEngine = kkp2p_engine_init(&kkp2p_conf, 5000);
    if (p2pEngine == NULL) {
        printf("init kkp2p engine error.\n");
        return -1;
    }
    // 切换日志级别到debug模式
    kkp2p_switch_log_level(p2pEngine, 4);
    // 获取listen相关的句柄
    int listenFd = kkp2p_listen_fd(p2pEngine);
  
    // 将peerId加入到云端和本地局域网
    kkp2p_join_lan(p2pEngine, peerId);
    kkp2p_join_net(p2pEngine, peerId, peerKey);

    // 初始化epoll
    int epollFd = epoll_create(1024);
    struct epoll_event* events = (struct epoll_event*)calloc(1,  1024 * sizeof(struct epoll_event));

    //将listen句柄加入到epoll,侦听可读事件
    struct epoll_event ev;
    ev.events = EPOLLIN ;
    ev.data.fd = listenFd;
    epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &ev);

    // loop 
    int loop = 1;
    while(loop) {
        int ret = epoll_wait(epollFd, events, 1024, 1000);
        if (ret < 0) {
            printf("ePoll error : %s\n",strerror(errno));
            break;
        }  
  
        if(ret == 0){
            continue;
        }
    
        // 处理每个事件
        for (int i = 0; i< ret; i++) {
            int fd = events[i].data.fd;
            if (events[i].events & EPOLLIN) { //可读事件
                if (fd == listenFd) {
                    // 如果是p2p sdk的listen句柄可读则accept一个连接
                    kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
                    int ret = kkp2p_accept(p2pEngine, 0, channel);
                    if (ret > 0) {
                        // accept成功,有新的连接过来,将新连接加入到epoll
                        int newFd = channel->fd;
                        struct epoll_event ev;
                        ev.events = EPOLLIN ;
                        ev.data.fd = newFd;
                        epoll_ctl(epollFd, EPOLL_CTL_ADD, newFd, &ev);

                        // 打开文件准备接收文件
                        char fileName[1024];
                        memset(fileName, 0, sizeof(fileName));
                        sprintf(fileName,"%d_%d.txt", time(NULL), newFd);
                        FILE* pWrite = fopen(fileName, "w+");
                        if (pWrite == NULL) {
                            printf("fopen %s error",fileName);
                            return -1;
                        }
                        // 将新连接信息保存起来
                        kkp2p_fd_ctx_t ctx;
                        memset(&ctx, 0, sizeof(kkp2p_fd_ctx_t));
                        ctx.channel = channel;
                        ctx.fd = newFd;
                        ctx.pWrite = pWrite;
                        fileFds[newFd] = ctx;
                    
                        // 打印连接信息,是p2p传输模式,还是中转传输模式,以及连接id
                        printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",newFd, channel->transmit_mode, channel->channel_id);
                     } else if (ret == 0) {
                         // timeout
                         free(channel);
                     } else {
                         printf("kkp2p_accept error.\n");
                         free(channel);
                         loop = 0;
                         break;
                     }
                 } else {
                     // 如果是新连接的fd可读事件,直接接收文件
                     int ret = OnRecvFile(fd, fileFds,epollFd, p2pEngine);
                     if (ret < 0 ) {
                         OnRecvError(fd, fileFds, epollFd, p2pEngine);
                     }
                 }
             } else if ((events[i].events & EPOLLHUP) || (events[i].events & EPOLLERR)){
                 // 异常事件
                 if (fd == listenFd) {
                     loop = 0;
                     break;
                 } else {
                     OnRecvError(fd, fileFds, epollFd, p2pEngine);
                 }
             }
        }
    }
    // 释放资源,退出
    free(events);
    close(epollFd);
    kkp2p_engine_destroy(p2pEngine);
    return 0;
}

本文含有隐藏内容,请 开通VIP 后查看