最佳实践-p2p传文件服务端源码(epoll单线程模型)
本例子用于举例说明,使用kkp2p sdk如何接收对端发送的文件,本例子是传文件服务端,发送文件客户端请看其他章节。
接收文件过程:首先接收4字节的文件大小(网络字节序,暂支持4G左右大小),然后再接收文件内容;接收完成后,发送响应,最后在退出。
服务端可以并行接收多个客户端发送过来的文件,虽然是个单线程,但是通过epoll实现了并发接收,读者需要有epoll背景知识,该代码在linux平台编译运行验证通过。
kkp2p sdk的头文件和库请在个人试用版本页面免费下载,免费使用和体验。
#include <stdlib.h> #include <stdio.h> #include <stdint.h> #include <errno.h> #include <string.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <sys/types.h> #include <fcntl.h> #include <unistd.h> #include <map> // 得包含kkp2p sdk的头文件,以及链接libkkp2p.a #include "kkp2p_sdk.h" using namespace std; // 对应每个接收文件的信息 typedef struct kkp2p_fd_ctx_s { int fd; kkp2p_channel_t* channel; int fileSize; int recved; FILE* pWrite; }kkp2p_fd_ctx_t; // 简单封装一下发送指定长度数据接口 int SendData(int fd, char* buff, int len) { int sended = 0 ; while (sended < len) { int wl = kkp2p_write(fd, buff + sended, len - sended, 1000); if (wl < 0) { printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno)); return -1; } sended += wl; } return len; } // 简单封装一下读取指定长度数据接口 int RecvData(int fd, char* buff, int len) { int recved = 0 ; while (recved < len) { int wl = kkp2p_read(fd, buff + recved, len - recved, 1000); if (wl < 0) { printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno)); return -1; } recved += wl; } return len; } // epoll可读事件的处理函数,接收文件内容 int OnRecvFile(int fd, map<int, kkp2p_fd_ctx_t>& fileFds, int epollFd, kkp2p_engine_t* p2pEngine) { map<int, kkp2p_fd_ctx_t>::iterator iter = fileFds.find(fd); if (iter == fileFds.end()) { return -1; } kkp2p_fd_ctx_t& ctx = fileFds[fd]; if (ctx.fileSize == 0) { // 首先接收4个字节的文件长度 int ret = RecvData(fd, (char*)&ctx.fileSize, sizeof(ctx.fileSize)); if (ret < 0 ) { return -1; } ctx.fileSize = ntohl(ctx.fileSize); } else { // 接收文件内容并写文件 char szBuff[1024]; int recved = kkp2p_read(fd, szBuff, sizeof(szBuff), 0); if (recved < 0) { printf("kkp2p_read error,fd:%d,ret:%d,errno:%d,desc:%s.\n",fd, recved, errno, strerror(errno)); return -1; } ctx.recved += recved; fwrite(szBuff, 1, recved, ctx.pWrite); if (ctx.recved >= ctx.fileSize) { // 如果接收完成则发送响应 char ch = '1'; SendData(fd, &ch, 1); //将fd对应事件从epoll删除 struct epoll_event ev; ev.data.fd = fd; ev.events = 0; epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, &ev); fflush(ctx.pWrite); fclose(ctx.pWrite); // 关闭连接的fd句柄以及连接,并退出 kkp2p_close_fd(fd); kkp2p_close_channel(p2pEngine, ctx.channel->channel_id); free(ctx.channel); fileFds.erase(iter); printf("recv file success,file size:%u,fd:%d,channel id:%u.\n",ctx.fileSize, fd, ctx.channel->channel_id); } } return 0; } // socket异常事件 int OnRecvError(int fd, map<int, kkp2p_fd_ctx_t>& fileFds, int epollFd, kkp2p_engine_t* p2pEngine) { map<int, kkp2p_fd_ctx_t>::iterator iter = fileFds.find(fd); if (iter == fileFds.end()) { return -1; } kkp2p_fd_ctx_t& ctx = fileFds[fd]; if (ctx.fileSize > ctx.recved) { printf("recv file error,file size:%u,receved:%u,fd:%d,channel id:%u.\n",ctx.fileSize, ctx.recved, fd,ctx.channel->channel_id); } //将socket句柄从epoll删除,不再侦听该socket的事件 struct epoll_event ev; ev.data.fd = fd; ev.events = 0; epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, &ev); fclose(ctx.pWrite); kkp2p_close_fd(fd); kkp2p_close_channel(p2pEngine, ctx.channel->channel_id); free(ctx.channel); fileFds.erase(iter); return 0; } int main(int argc, char** argv) { // 输入两个参数,第一个是peerId,即登录账号,第二个是账号对应的密钥,云端禁止非法账号登录 if (argc < 3) { printf("usage:%s peerId peerKey\n",argv[0]); return -1; } char* peerId = argv[1]; char* peerKey = argv[2]; // 用于保存每个文件的上下文信息 map<int, kkp2p_fd_ctx_t> fileFds; // 初始化 kkp2p sdk // 包括云端登录域名端口,局域网端口,日志配置 kkp2p_engine_conf_t kkp2p_conf; memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t)); kkp2p_conf.login_domain = "125.72.218.199"; kkp2p_conf.login_port = 3080; kkp2p_conf.lan_search_port = 3549; kkp2p_engine_t* p2pEngine = kkp2p_engine_init(&kkp2p_conf, 5000); if (p2pEngine == NULL) { printf("init kkp2p engine error.\n"); return -1; } // 获取listen相关的句柄 int listenFd = kkp2p_listen_fd(p2pEngine); // 将peerId加入到云端和本地局域网 kkp2p_join_lan(p2pEngine, peerId); kkp2p_join_net(p2pEngine, peerId, peerKey); // 初始化epoll int epollFd = epoll_create(1024); struct epoll_event* events = (struct epoll_event*)calloc(1, 1024 * sizeof(struct epoll_event)); //将listen句柄加入到epoll,侦听可读事件 struct epoll_event ev; ev.events = EPOLLIN ; ev.data.fd = listenFd; epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &ev); // loop int loop = 1; while(loop) { int ret = epoll_wait(epollFd, events, 1024, 1000); if (ret < 0) { printf("ePoll error : %s\n",strerror(errno)); break; } if(ret == 0){ continue; } // 处理每个事件 for (int i = 0; i< ret; i++) { int fd = events[i].data.fd; if (events[i].events & EPOLLIN) { //可读事件 if (fd == listenFd) { // 如果是listen句柄可读则accept一个连接 kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t)); int ret = kkp2p_accept(p2pEngine, 0, channel); if (ret > 0) { // accept成功,有新的连接过来,将新连接加入到epoll int newFd = channel->fd; struct epoll_event ev; ev.events = EPOLLIN ; ev.data.fd = newFd; epoll_ctl(epollFd, EPOLL_CTL_ADD, newFd, &ev); // 打开文件准备接收文件 char fileName[1024]; memset(fileName, 0, sizeof(fileName)); sprintf(fileName,"%d_%d.txt", time(NULL), newFd); FILE* pWrite = fopen(fileName, "w+"); if (pWrite == NULL) { printf("fopen %s error",fileName); return -1; } // 将新连接信息保存起来 kkp2p_fd_ctx_t ctx; memset(&ctx, 0, sizeof(kkp2p_fd_ctx_t)); ctx.channel = channel; ctx.fd = newFd; ctx.pWrite = pWrite; fileFds[newFd] = ctx; // 打印连接信息,是p2p模式,还是中转模式,以及连接id printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",newFd, channel->transmit_mode, channel->channel_id); } else if (ret == 0) { // timeout free(channel); } else { printf("kkp2p_accept error.\n"); free(channel); loop = 0; break; } } else { // 如果是新连接的fd可读事件,直接接收文件 int ret = OnRecvFile(fd, fileFds,epollFd, p2pEngine); if (ret < 0 ) { OnRecvError(fd, fileFds, epollFd, p2pEngine); } } } else if ((events[i].events & EPOLLHUP) || (events[i].events & EPOLLERR)){ // 异常事件 if (fd == listenFd) { loop = 0; break; } else { OnRecvError(fd, fileFds, epollFd, p2pEngine); } } } } // 释放资源,退出 free(events); close(epollFd); kkp2p_engine_destroy(p2pEngine); return 0; }