最佳实践-对接SRS流媒体服务器
一、srs支持p2p通信简介
流媒体服务srs是国内开发且开源的一款功能强大,性能强劲的优秀的流媒体服务器,目前正被越来越广泛的使用。srs一般被用户部署在公网上的云主机,这样方便用户进行推拉流等各种操作。但将srs部署在公网的云主机上也会带来一个问题,就是随着同时在线的推拉流用户比较多,会需要较多的带宽,消耗较多的带宽成本。
本文档用来阐述一种p2p解决方案,通过库快科技(官网)p2p技术可以将srs服务部署在您的局域网(比如工作局域网环境或者居家环境),此种方案不用修改srs的任何代码或者配置,就可以让srs支持p2p通信,为您极大的节约带宽成本。
二、系统架构
先上图如下所示:
如上图所示,我们用库快科技的sdk开发一个srs的p2p接入代理进程,srs仅和srs的p2p代理(srs p2p proxy)交互通信,由srs的p2p代理来和外部进行通信,srs和其代理可以部署在同一台主机上,或者同一个局域网机房内。
同时,外部的推拉流客户端或者app,也和p2p接入代理(client p2p proxy)直接进行通信,将推拉流请求发送给代理服务。由于client p2p proxy和srs p2p proxy均由支持p2p通信的sdk开发,所以在网络可以穿透情况下,可以直接进行p2p通信,流媒体数据不经过云端服务器中转,为您节约带宽成本;当代理之间网络无法穿透情况,会自动使用p2p的云端服务进行中转通信。根据统计在国内网络穿透率可以至少达到三分之二,所以这种部署可以为您减少三分之二的带宽成本。
库快科技( https://kkuai.com) 的p2p sdk提供类似于网络编程的sdk接口,极易掌握。client p2p proxy和srs p2p proxy的核心代码仅有100行左右,就可以支持强大的代理接入功能。您也可以将client p2p proxy的代码逻辑内置在您的业务程序代码里面(启动127.0.0.1 的接入代理,您的app直接访问127.0.0.1这个url)
三、 流媒体服务srs p2p proxy源码
srs的p2p代理服务仅100行代码左右,这里给一个demo例子,一个连接启动一个线程,如果同时在线并发量比较高,读者可以自行改成epoll网络模型模式,以支持更多的在线用户。
该例子在Linux以及mac机器上编译验证通过。
#include <errno.h> #include <stdio.h> #include <signal.h> #include <sys/syscall.h> #include <sys/stat.h> #include <sys/syscall.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h> #include <string.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <poll.h> #include <string> using namespace std; // 到kkuai.com获取最新的头文件和库 #include "kkp2p_sdk.h" char g_run_flag = 1; kkp2p_engine_t* g_engine = NULL; // srs服务的rtmp的ip和端口 string g_srs_ip; unsigned short g_srs_port; void set_exit_flag(int sig) { g_run_flag = 0; } int send_data(int fd, char* buff, int len) { int sended = 0 ; while (sended < len) { // 1秒超时 int wl = kkp2p_write(fd, buff + sended, len - sended, 1000); if (wl < 0) { printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno)); return -1; } sended += wl; } return len; } void* process_client(void* arg) { kkp2p_channel_t* channel = (kkp2p_channel_t*)arg; // 连接srs服务器 struct sockaddr_in addr; memset(&addr, 0, sizeof(sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(g_srs_ip.c_str()); addr.sin_port = htons(g_srs_port); int namelen = sizeof(addr); int sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int ret = connect(sockFd, (struct sockaddr*)&addr, sizeof(sockaddr_in)); if (ret < 0) { printf("connect %s:%d error,thread exit.\n",g_srs_ip.c_str(), g_srs_port); close(sockFd); kkp2p_close_fd(channel->fd); kkp2p_close_channel(g_engine, channel->channel_id); free(channel); return NULL; } // 设置非阻塞模式 int val = fcntl(sockFd, F_GETFL,0); fcntl(sockFd, F_SETFL, val|O_NONBLOCK); // 开始在srs和客户端之间中转数据 struct pollfd waitFd[2]; memset(waitFd,0,sizeof(waitFd)); waitFd[0].fd = sockFd; waitFd[0].events = POLLIN; waitFd[1].fd = channel->fd; waitFd[1].events = POLLIN; printf("sockFd %d,channel fd:%d\n",sockFd,channel->fd); char szBuff[1024]; int rl = 0 ; int wl = 0; int loop = 1; while(loop) { int ret = poll(waitFd, 2, 1000); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; } else { printf("poll error,errno:%d,desc:%s.\n",errno,strerror(errno)); break; } } else if (ret == 0) { continue; } for( int i =0 ; i<2; i++) { int fd = waitFd[i].fd; if (waitFd[i].revents & POLLIN) { rl = kkp2p_read(fd, szBuff, sizeof(szBuff), 0); if (rl < 0) { printf("kkp2p_read fd:%d error,errno:%d,desc:%s,sockFd:%d,channel fd:%d\n",fd,errno,strerror(errno),sockFd, channel->fd); loop = 0; break; } else if (rl == 0) { continue; } else { int writeFd = 0; if (fd == sockFd) { writeFd = channel->fd; } else { writeFd = sockFd; } wl = send_data(writeFd, szBuff, rl); if (wl < 0) { printf("kkp2p_write fd:%d error,errno:%d,desc:%s.\n",writeFd,errno,strerror(errno)); loop = 0; break; } } }else if ((waitFd[i].revents & POLLHUP) || (waitFd[i].revents & POLLERR) || (waitFd[i].revents & POLLNVAL)) { printf("fd revents %d error,fd:%d,sockFd:%d,channel fd:%d.\n",waitFd[i].revents,waitFd[i].fd,sockFd,channel->fd); loop = 0 ; break; } } } close(sockFd); kkp2p_close_fd(channel->fd); printf("close channel,channelId:%u.\n",channel->channel_id); kkp2p_close_channel(g_engine, channel->channel_id); free(channel); return NULL; } // 总共4个参数,p2p系统登录账号和密码,以及srs的ip和端口号 int main(int argc, char** argv) { if (argc < 5) { printf("usage:%s peer_id peer_key srs_ip srs_port\n", argv[0]); return -1; } // 利用usr1信号终止进程服务 // kill -user1 pid struct sigaction actions; memset(&actions, 0, sizeof(actions)); sigemptyset(&actions.sa_mask); actions.sa_flags = 0; actions.sa_handler = set_exit_flag ; sigaction(SIGUSR1,&actions,NULL); kkp2p_engine_conf_t kkp2p_conf; memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t)); // p2p云端服务的登录域名(ip)和端口号等信息 // 根据实际部署情况填写,从kkuai.com下载云端服务自行部署 kkp2p_conf.login_domain = (char*)"p2ptest.com"; kkp2p_conf.login_port = 3080; kkp2p_conf.lan_search_port = 3549; g_engine = kkp2p_engine_init(&kkp2p_conf, 5000); // 将peer加入到p2p网络 kkp2p_join_lan(g_engine, argv[1]); kkp2p_join_net(g_engine, argv[1], argv[2]); g_srs_ip = argv[3]; g_srs_port = atoi(argv[4]); kkp2p_channel_t channel ; while(g_run_flag) { // 循环接收外部连接请求 int ret = kkp2p_accept(g_engine, 1000, &channel); if (ret < 0) { // error printf("kkp2p_accept error,exit\n"); break; } else if (ret == 0) { // timeout continue; } else { pthread_t ThreadId; // 接收到一个远程连接,transmit_mode为1表示p2p通信,为2表示中转通信 // connect_desc是双方约定的连接描述信息,可以用于表示协议编号 // demo这里不作判断,统一默认是rtmp推拉流协议 printf("accept new channel,fd:%d,mode:%d,conn_desc:%d\n",channel.fd, channel.transmit_mode,channel.connect_desc); kkp2p_channel_t* newChannel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t)); memcpy(newChannel, &channel, sizeof(kkp2p_channel_t)); // 启动线程处理 pthread_create(&ThreadId, NULL, process_client,(void*)newChannel); } } kkp2p_engine_destroy(g_engine); return 0; }
整个逻辑比较简单,就是循环accept连接,当有连接过来就启动一个线程处理,在srs和远程连接之间透传数据即可。读者可以改成epoll模型。
四、客户端p2p代理client p2p proxy源码
推拉流或者您的业务进程的p2p代理服务源码如下,原理也比较简单,就是启动一个ip和端口代理服务,外部服务推拉流请求先发给代理,由代理和远端的srs的p2p代理服务通信。该代理服务代码仅仅数行,非常方便您在业务程序源码里面直接使用。
该代码在windows平台下调试通过
#include <windows.h> #include <process.h> #include <iostream> #include <stdio.h> #include <stdint.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <fcntl.h> #include <sys/timeb.h> #include <signal.h> // 到kkuai.com获取最新的头文件和库 #include "kkp2p_sdk.h" #pragma comment(lib,"Ws2_32.lib") #pragma comment(lib, "iphlpapi.lib") // 利用信号退出 char run_flag = 1; void SignalHandler(int signal) { printf("exit...\n"); run_flag = 0; } // 输入参数为代理ip和端口(组成推拉流的url),以及需要连接的流媒体srs的p2p账号 // connect_mode为连接模式,0为自动模式,1为p2p连接,2为中转连接 int main(int argc, char** argv) { if (argc < 5) { printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]); return -1; } // 利用ctrl+z退出进程 typedef void(*SignalHandlerPointer)(int); SignalHandlerPointer previousHandler; previousHandler = signal(SIGINT, SignalHandler); WSADATA wsadata;//注释2 WSAStartup(MAKEWORD(2, 2), &wsadata); kkp2p_engine_conf_t kkp2p_conf; memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t)); // p2p云端服务的登录域名(ip)和端口 // 从kkuai.com下载云端服务自行部署 kkp2p_conf.login_domain = (char*)"p2ptest.com"; kkp2p_conf.login_port = 3080; kkp2p_conf.lan_search_port = 3549; kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000); // 建连参数 kkp2p_connect_ctx_t ctx; memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t)); memcpy(ctx.peer_id, argv[3], strlen(argv[3])); // 创建tcp类型传输通道 ctx.channel_type = KKP2P_TCP_CHANNEL; // 连接超时时间 ctx.timeout = 5000; // 建连模式 ctx.connect_mode = atoi(argv[4]); // 启动代理服务,该代理服务会接收推拉流服务请求,并和远端流媒体服务的p2p代理通信 uint32_t proxyId = 0; int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId); if (ret < 0) { printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]); return -1; } else { printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]); } while (run_flag) { Sleep(1000); } kkp2p_stop_proxy(g_engine, proxyId); kkp2p_engine_destroy(g_engine); return 0; }
五、效果演示
为了方便演示,我们用同一个局域网的两台机器进行演示,一台mac机器,一台windows机器。mac机器运行srs和srs p2p proxy服务。
启动srs服务,srs缺省的rtmp服务端口号是1935
启动srs的p2p代理服务srs p2p proxy
如上图所示,srs p2p proxy输入参数为登录p2p服务系统的登录账号和登录密码,以及srs服务的rtmp的侦听ip和端口,因为部署在同一台机器上,所以这里ip地址为127.0.0.1
六、效果演示,推流端
首先启动推流端代理服务,在windows机器上启动。
推流端代理服务启动
启动一个127.0.0.1:32915的p2p代理服务,该代理服务和test-00097进行通信,p2p建连模式为0,优先创建p2p连接,p2p不通则自动转中转连接。
在同一台windows机器上用ffmpeg进行推流,推流地址为127.0.0.1:32915,启动命令如下
ffmpeg -re -i spartacus.mkv -c copy -f flv rtmp://127.0.0.1:32915/live/a
将视频spartacus.mkv推流到127.0.0.1:32915
六、效果演示,拉流端
在同一台windows机器上执行,首先启动拉流代理,端口号为32916
然后再启动vlc播放器输入流媒体地址进行播放,流媒体地址为
最后可以看到流畅的音视频播放画面