最佳实践-p2p传文件服务端源码(多线程模型)
本例子用于举例说明,使用kkp2p sdk如何接收对端发送的文件,本例子是传文件服务端,发送文件客户端请看其他章节。
接收文件过程:首先接收4字节的文件大小(网络字节序,暂支持4G左右大小),然后再接收文件内容;接收完成后,发送响应,最后在退出。
服务端可以并行接收多个客户端发送过来的文件,每个文件启动一个线程,接收完后线程退出。该代码在linux平台编译运行验证通过。
kkp2p sdk的头文件和库请在个人试用版本页面免费下载,免费使用和体验。
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <map>
// 得包含kkp2p sdk的头文件,以及链接libkkp2p.a
#include "kkp2p_sdk.h"
// 全局变量
kkp2p_engine_t* p2pEngine = NULL;
// 简单封装一下发送指定长度数据接口
int SendData(int fd, char* buff, int len) {
int sended = 0 ;
while (sended < len) {
// 1秒超时
int wl = kkp2p_write(fd, buff + sended, len - sended, 1000);
if (wl < 0) {
printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
return -1;
}
sended += wl;
}
return len;
}
// 简单封装一下读取指定长度数据接口
int RecvData(int fd, char* buff, int len) {
int recved = 0 ;
while (recved < len) {
// 1秒超时
int wl = kkp2p_read(fd, buff + recved, len - recved, 1000);
if (wl < 0) {
printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
return -1;
}
recved += wl;
}
return len;
}
// 接收线程,每个文件对应一个
void* ReadThread(void* arg)
{
kkp2p_channel_t* channel = (kkp2p_channel_t*)arg;
// 首先接收4个字节文件长度
uint32_t fileSize = 0;
int recved = RecvData(channel->fd, (char*)&fileSize, sizeof(fileSize));
if (recved < 0) {
printf("kkp2p_read file size error,channel id:%u,exit thread.\n", channel->channel_id);
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(p2pEngine, channel->channel_id);
free(channel);
return NULL;
}
fileSize = ntohl(fileSize);
// 创建一个新文件,准备接收文件内容
char fileName[1024];
memset(fileName, 0, sizeof(fileName));
sprintf(fileName,"%d_%d.txt", time(NULL), channel->fd);
FILE* pWrite = fopen(fileName, "w+");
if (pWrite == NULL) {
printf("fopen %s error,channel id:%u,exit thread.\n", fileName, channel->channel_id);
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(p2pEngine, channel->channel_id);
free(channel);
return NULL;
}
// 循转接收文件内容
uint32_t totalRecved = 0 ;
char szBuff[1024];
while (totalRecved < fileSize) {
int recved = kkp2p_read(channel->fd, szBuff, sizeof(szBuff),1000);
if (recved < 0) {
// 接收出错
printf("kkp2p_read error,channel id:%u,exit thread.\n", channel->channel_id);
break;
} else if (recved == 0) {
// 超时
// timeout
} else {
// 写文件,接收完成发送响应
totalRecved += recved;
fwrite(szBuff, 1, recved, pWrite);
if (totalRecved >= fileSize) {
char ch = '1';
SendData(channel->fd, &ch, 1);
printf("recv file success,file size:%u,fd:%d,channel id:%u.\n",fileSize, channel->fd, channel->channel_id);
}
}
}
// 接收失败发送响应
if (totalRecved < fileSize) {
char ch = '0';
SendData(channel->fd, &ch, 1);
printf("recv file error,file size:%u,recved:%u,fd:%d,channel id:%u.\n",fileSize, totalRecved,channel->fd, channel->channel_id);
}
fflush(pWrite);
fclose(pWrite);
// 关闭连接的代理fd句柄,以及连接
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(p2pEngine, channel->channel_id);
free(channel);
}
// 输入两个参数,第一个是peerId,即登录账号,第二个是账号对应的密钥,云端禁止非法账号登录
int main(int argc, char** argv)
{
if (argc < 3) {
printf("usage:%s peerId peerKey\n",argv[0]);
return -1;
}
// windows平台加上网络库
//WSADATA wsadata;//注释2
//WSAStartup(MAKEWORD(2, 2), &wsadata);
char* peerId = argv[1];
char* peerKey = argv[2];
// 初始化 kkp2p sdk
// 包括云端登录域名端口,局域网端口,日志配置
kkp2p_engine_conf_t kkp2p_conf;
memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t));
kkp2p_conf.login_domain = "125.72.218.199";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
p2pEngine = kkp2p_engine_init(&kkp2p_conf, 5000);
if (p2pEngine == NULL) {
printf("init kkp2p engine error.\n");
return -1;
}
// 切换日志级别到debug模式
kkp2p_switch_log_level(p2pEngine, 4);
// 将peerId加入到云端和本地局域网
kkp2p_join_lan(p2pEngine, peerId);
kkp2p_join_net(p2pEngine, peerId, peerKey);
printf("init success,begin loop.\n");
// 主线程循环接收远端连接
kkp2p_channel_t channel;
while(1) {
int ret = kkp2p_accept(p2pEngine, 1000, &channel);
if (ret < 0) {
// 出错
printf("kkp2p_accept error,exit\n");
break;
} else if (ret == 0) {
// 超时
continue;
} else {
// 有新连接过来,启动新线程处理
pthread_t ThreadId;
printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel.fd, channel.transmit_mode, channel.channel_id);
kkp2p_channel_t* newChannel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
memcpy(newChannel, &channel, sizeof(kkp2p_channel_t));
pthread_create(&ThreadId, NULL, ReadThread,(void*)newChannel);
}
}
// 退出
kkp2p_engine_destroy(p2pEngine);
return 0;
}
