最佳实践-对接SRS流媒体服务器

一、srs支持p2p通信简介
流媒体服务srs是国内开发且开源的一款功能强大,性能强劲的优秀的流媒体服务器,目前正被越来越广泛的使用。srs一般被用户部署在公网上的云主机,这样方便用户进行推拉流等各种操作。但将srs部署在公网的云主机上也会带来一个问题,就是随着同时在线的推拉流用户比较多,会需要较多的带宽,消耗较多的带宽成本。

本文档用来阐述一种p2p解决方案,通过库快科技(官网)p2p技术可以将srs服务部署在您的局域网(比如工作局域网环境或者居家环境),此种方案不用修改srs的任何代码或者配置,就可以让srs支持p2p通信,为您极大的节约带宽成本。

二、系统架构

先上图如下所示:


如上图所示,我们用库快科技的sdk开发一个srs的p2p接入代理进程,srs仅和srs的p2p代理(srs p2p proxy)交互通信,由srs的p2p代理来和外部进行通信,srs和其代理可以部署在同一台主机上,或者同一个局域网机房内。

同时,外部的推拉流客户端或者app,也和p2p接入代理(client p2p proxy)直接进行通信,将推拉流请求发送给代理服务。由于client p2p proxy和srs p2p proxy均由支持p2p通信的sdk开发,所以在网络可以穿透情况下,可以直接进行p2p通信,流媒体数据不经过云端服务器中转,为您节约带宽成本;当代理之间网络无法穿透情况,会自动使用p2p的云端服务进行中转通信。根据统计在国内网络穿透率可以至少达到三分之二,所以这种部署可以为您减少三分之二的带宽成本。

库快科技( https://kkuai.com) 的p2p sdk提供类似于网络编程的sdk接口,极易掌握。client p2p proxy和srs p2p proxy的核心代码仅有100行左右,就可以支持强大的代理接入功能。您也可以将client p2p proxy的代码逻辑内置在您的业务程序代码里面(启动127.0.0.1 的接入代理,您的app直接访问127.0.0.1这个url)

三、  流媒体服务srs p2p proxy源码

srs的p2p代理服务仅100行代码左右,这里给一个demo例子,一个连接启动一个线程,如果同时在线并发量比较高,读者可以自行改成epoll网络模型模式,以支持更多的在线用户。

该例子在Linux以及mac机器上编译验证通过。

#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <sys/syscall.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <string>
using namespace std;

// 到kkuai.com获取最新的头文件和库
#include "kkp2p_sdk.h"

char g_run_flag = 1;
kkp2p_engine_t* g_engine = NULL;

// srs服务的rtmp的ip和端口
string g_srs_ip;
unsigned short g_srs_port;

void set_exit_flag(int sig)
{
    g_run_flag = 0;
}

int send_data(int fd, char* buff, int len)
{
    int sended = 0 ;
    while (sended < len)
    {
        // 1秒超时
        int wl = kkp2p_write(fd, buff + sended, len - sended, 1000);
        if (wl < 0)
        {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

void* process_client(void* arg)
{
    kkp2p_channel_t* channel = (kkp2p_channel_t*)arg;

    // 连接srs服务器
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr(g_srs_ip.c_str());
    addr.sin_port = htons(g_srs_port);
    int namelen = sizeof(addr);

    int sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    int ret = connect(sockFd, (struct sockaddr*)&addr, sizeof(sockaddr_in));
    if (ret < 0)
    {
        printf("connect %s:%d error,thread exit.\n",g_srs_ip.c_str(), g_srs_port);
        close(sockFd);
        kkp2p_close_fd(channel->fd);
        kkp2p_close_channel(g_engine, channel->channel_id);
        free(channel);
        return NULL;
    } 

    
    // 设置非阻塞模式
    int val = fcntl(sockFd, F_GETFL,0);
    fcntl(sockFd, F_SETFL, val|O_NONBLOCK);

    // 开始在srs和客户端之间中转数据
    struct pollfd waitFd[2];
    memset(waitFd,0,sizeof(waitFd));
    waitFd[0].fd = sockFd;
    waitFd[0].events = POLLIN;
    waitFd[1].fd = channel->fd;
    waitFd[1].events = POLLIN;

    printf("sockFd %d,channel fd:%d\n",sockFd,channel->fd);

    char szBuff[1024];
    int rl = 0 ;
    int wl = 0;
    int loop = 1;
    while(loop)
    {
        int ret = poll(waitFd, 2, 1000);
        if (ret < 0)
        {
            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
            {
                continue;
            }
            else
            {
                printf("poll error,errno:%d,desc:%s.\n",errno,strerror(errno));
                break;
            }
        }
        else if (ret == 0)
        {
            continue;
        }

        for( int i =0 ; i<2; i++)
        {
            int fd = waitFd[i].fd;
            if (waitFd[i].revents & POLLIN)
            {
                rl = kkp2p_read(fd, szBuff, sizeof(szBuff), 0);      
                if (rl < 0)
                {
                    printf("kkp2p_read fd:%d error,errno:%d,desc:%s,sockFd:%d,channel fd:%d\n",fd,errno,strerror(errno),sockFd, channel->fd);
                    loop = 0;
                    break;
                }
                else if (rl == 0)
                {
                    continue;
                }
                else
                {
                    int writeFd = 0;
                    if (fd == sockFd) {
                        writeFd = channel->fd;
                    }
                    else {
                        writeFd = sockFd;
                    }

                    wl = send_data(writeFd, szBuff, rl);
                    if (wl < 0)
                    {
                        printf("kkp2p_write fd:%d error,errno:%d,desc:%s.\n",writeFd,errno,strerror(errno));
                        loop = 0;
                        break;
                    }
                }
            }else if ((waitFd[i].revents & POLLHUP) || (waitFd[i].revents & POLLERR) || (waitFd[i].revents & POLLNVAL)) {
                printf("fd revents %d error,fd:%d,sockFd:%d,channel fd:%d.\n",waitFd[i].revents,waitFd[i].fd,sockFd,channel->fd);
                loop = 0 ;
                break;
            }
        }
    }

    close(sockFd);
    kkp2p_close_fd(channel->fd);
    printf("close channel,channelId:%u.\n",channel->channel_id);
    kkp2p_close_channel(g_engine, channel->channel_id);
    free(channel);
    return NULL;
}


// 总共4个参数,p2p系统登录账号和密码,以及srs的ip和端口号
int main(int argc, char** argv)
{
    if (argc < 5)
    {
        printf("usage:%s peer_id peer_key srs_ip srs_port\n", argv[0]);
        return -1;
    }

    // 利用usr1信号终止进程服务
    // kill -user1 pid
    struct sigaction actions;
    memset(&actions, 0, sizeof(actions));
    sigemptyset(&actions.sa_mask);
    actions.sa_flags = 0;
    actions.sa_handler = set_exit_flag ;
    sigaction(SIGUSR1,&actions,NULL);

    kkp2p_engine_conf_t kkp2p_conf;
    memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t));

    // p2p云端服务的登录域名(ip)和端口号等信息
    // 根据实际部署情况填写,从kkuai.com下载云端服务自行部署
    kkp2p_conf.login_domain = (char*)"p2ptest.com";
    kkp2p_conf.login_port = 3080;
    kkp2p_conf.lan_search_port = 3549;
    g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    
    // 将peer加入到p2p网络
    kkp2p_join_lan(g_engine, argv[1]);
    kkp2p_join_net(g_engine, argv[1], argv[2]);

    g_srs_ip = argv[3];
    g_srs_port = atoi(argv[4]);

    kkp2p_channel_t channel ;
    while(g_run_flag)
    {
        // 循环接收外部连接请求
        int ret = kkp2p_accept(g_engine, 1000, &channel);
        if (ret < 0)
        {
            // error
            printf("kkp2p_accept error,exit\n");
            break;
        }
        else if (ret == 0)
        {
            // timeout
            continue;
        }
        else
        {
            pthread_t ThreadId;
            // 接收到一个远程连接,transmit_mode为1表示p2p通信,为2表示中转通信
            // connect_desc是双方约定的连接描述信息,可以用于表示协议编号
            // demo这里不作判断,统一默认是rtmp推拉流协议
            printf("accept new channel,fd:%d,mode:%d,conn_desc:%d\n",channel.fd, channel.transmit_mode,channel.connect_desc);
            kkp2p_channel_t* newChannel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
            memcpy(newChannel, &channel, sizeof(kkp2p_channel_t));
            
            // 启动线程处理
            pthread_create(&ThreadId, NULL, process_client,(void*)newChannel);
        }
    }

    kkp2p_engine_destroy(g_engine);
    return 0;
}

整个逻辑比较简单,就是循环accept连接,当有连接过来就启动一个线程处理,在srs和远程连接之间透传数据即可。读者可以改成epoll模型。

四、客户端p2p代理client p2p proxy源码

推拉流或者您的业务进程的p2p代理服务源码如下,原理也比较简单,就是启动一个ip和端口代理服务,外部服务推拉流请求先发给代理,由代理和远端的srs的p2p代理服务通信。该代理服务代码仅仅数行,非常方便您在业务程序源码里面直接使用。

该代码在windows平台下调试通过

#include <windows.h>
#include <process.h>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/timeb.h>
#include <signal.h>

// 到kkuai.com获取最新的头文件和库
#include "kkp2p_sdk.h"

#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib, "iphlpapi.lib")

// 利用信号退出
char run_flag = 1;
void SignalHandler(int signal)
{
	printf("exit...\n");
	run_flag = 0;
}

// 输入参数为代理ip和端口(组成推拉流的url),以及需要连接的流媒体srs的p2p账号
// connect_mode为连接模式,0为自动模式,1为p2p连接,2为中转连接
int main(int argc, char** argv) {
	if (argc < 5) {
		printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]);
		return -1;
	}

    // 利用ctrl+z退出进程
	typedef void(*SignalHandlerPointer)(int);
	SignalHandlerPointer previousHandler;
	previousHandler = signal(SIGINT, SignalHandler);

	WSADATA wsadata;//注释2
	WSAStartup(MAKEWORD(2, 2), &wsadata);

	kkp2p_engine_conf_t kkp2p_conf;
    memset(&kkp2p_conf, 0, sizeof(kkp2p_engine_conf_t));
    
    // p2p云端服务的登录域名(ip)和端口
    // 从kkuai.com下载云端服务自行部署
	kkp2p_conf.login_domain = (char*)"p2ptest.com";
	kkp2p_conf.login_port = 3080;
	kkp2p_conf.lan_search_port = 3549;
	kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    // 建连参数
	kkp2p_connect_ctx_t ctx;
	memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t));
	memcpy(ctx.peer_id, argv[3], strlen(argv[3]));

        // 创建tcp类型传输通道
        ctx.channel_type = KKP2P_TCP_CHANNEL;
	
    // 连接超时时间
    ctx.timeout = 5000;

    // 建连模式
	ctx.connect_mode = atoi(argv[4]);

    // 启动代理服务,该代理服务会接收推拉流服务请求,并和远端流媒体服务的p2p代理通信
	uint32_t proxyId = 0;
	int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId);
	if (ret < 0) {
		printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]);
		return -1;
	}
	else {
		printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]);
	}

	while (run_flag) {
		Sleep(1000);
	}

	kkp2p_stop_proxy(g_engine, proxyId);
	kkp2p_engine_destroy(g_engine);
	return 0;
}

五、效果演示

为了方便演示,我们用同一个局域网的两台机器进行演示,一台mac机器,一台windows机器。mac机器运行srs和srs p2p proxy服务。

启动srs服务,srs缺省的rtmp服务端口号是1935

启动srs的p2p代理服务srs p2p proxy

如上图所示,srs p2p proxy输入参数为登录p2p服务系统的登录账号和登录密码,以及srs服务的rtmp的侦听ip和端口,因为部署在同一台机器上,所以这里ip地址为127.0.0.1

六、效果演示,推流端

首先启动推流端代理服务,在windows机器上启动。

推流端代理服务启动

启动一个127.0.0.1:32915的p2p代理服务,该代理服务和test-00097进行通信,p2p建连模式为0,优先创建p2p连接,p2p不通则自动转中转连接。

在同一台windows机器上用ffmpeg进行推流,推流地址为127.0.0.1:32915,启动命令如下

ffmpeg -re -i spartacus.mkv -c copy -f flv rtmp://127.0.0.1:32915/live/a

将视频spartacus.mkv推流到127.0.0.1:32915


六、效果演示,拉流端

在同一台windows机器上执行,首先启动拉流代理,端口号为32916

然后再启动vlc播放器输入流媒体地址进行播放,流媒体地址为


最后可以看到流畅的音视频播放画面