peer server(rtmp server)端模式流程简介如下,主线程不断调用kkp2p_accept接入新的连接,如果有新的连接过来,会得到一个句柄fd,然后在该fd句柄上处理rtmp协议即可。服务端代码来自于互联网,在原有代码上做了p2p的使用适配,我们放在最后讲解,该服务端代码在linux平台行测试通过。由于仅是演示作用,有些细节考虑不完善,仅做参考使用。
最后我们看peer server端(rtmp服务端)源码,该源码大部分都是rtmp协议相关,主要来自于互联网,仅做了p2p适配处理,我们可以看到库快科技的p2p库是极易使用的。
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>
#include <librtmp/rtmp.h>
#include <librtmp/log.h>
#include "kkp2p_sdk.h"
kkp2p_engine_t* g_engine = NULL;
class CMutex
{
pthread_mutex_t m_mutex;
public:
CMutex()
{
pthread_mutex_init(&m_mutex, NULL);
}
~CMutex()
{
pthread_mutex_destroy(&m_mutex);
}
void lock()
{
pthread_mutex_lock(&m_mutex);
}
void unlock()
{
pthread_mutex_unlock(&m_mutex);
}
};
template<typename T>
class CAutoLock
{
T* m_pLock;
public:
CAutoLock(T* pLock) : m_pLock(pLock)
{
m_pLock->lock();
}
~CAutoLock()
{
m_pLock->unlock();
}
};
class CConnection
{
uint32_t m_uConnID;
uint32_t m_uNextStreamID;
RTMP* m_pRtmp;
std::string m_strApp;
int m_nStreamType;
std::set<uint32_t> m_setUsingStreamID;
std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;
std::list<RTMPPacket*> m_listpPacket;
CMutex m_mutex;
sem_t m_sem;
public:
// 流类型
enum EStreamType
{
Unkown = 0,
Publish,
Play
};
CConnection(uint32_t uConnID, int nSocket)
: m_uConnID(uConnID)
, m_uNextStreamID(1)
, m_pRtmp(NULL)
, m_strApp("")
, m_nStreamType(Unkown)
{
m_pRtmp = RTMP_Alloc();
RTMP_Init(m_pRtmp);
m_pRtmp->m_sb.sb_socket = nSocket;
sem_init(&m_sem, 0, 0);
}
virtual ~CConnection()
{
RTMP_Close(m_pRtmp);
RTMP_Free(m_pRtmp);
sem_destroy(&m_sem);
}
uint32_t ConnID()
{
return m_uConnID;
}
RTMP* Rtmp()
{
return m_pRtmp;
}
int Socket()
{
return m_pRtmp->m_sb.sb_socket;
}
void setAppName(const std::string& strApp)
{
m_strApp = strApp;
}
const std::string& getAppName() const
{
return m_strApp;
}
void setStreamType(EStreamType emType)
{
m_nStreamType = emType;
}
EStreamType getStreamType()
{
return (EStreamType)m_nStreamType;
}
uint32_t genStreamID()
{
uint32_t uStreamID = m_uNextStreamID++;
m_setUsingStreamID.insert(uStreamID);
return uStreamID;
}
// 检查流ID是否合法
bool isValidStreamID(uint32_t uStreamID)
{
return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
}
// 登记 推流ID与playpath 映射关系
void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 推流ID与playpath 映射关系
void unbindPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.erase(uStreamID);
}
// 获取 推流ID映射关系
std::string getPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPublishStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 推流的playpath列表
const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 推流ID与playpath 映射关系
void cleanPublishPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.clear();
}
// 登记 拉流ID与playpath 映射关系
void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 拉流ID与playpath 映射关系
void unbindPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.erase(uStreamID);
}
// 获取 拉流ID映射关系
std::string getPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPlayStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 拉流的playpath列表
const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 拉流ID与playpath 映射关系
void cleanPlayPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.clear();
}
// 通知指定的playpath即将重置
void tellResetPlayPath(const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
uint32_t uStreamID = 0;
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPublishStreamIDPlayPath.erase(iter);
break;
}
}
if (uStreamID == 0)
{
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPlayStreamIDPlayPath.erase(iter);
break;
}
}
}
}
// 提取待发送的报文
RTMPPacket* popPacket()
{
struct timeval tv;
gettimeofday(&tv, NULL);
double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
sem_timedwait(&m_sem, &ts);
CAutoLock<CMutex> lock(&m_mutex);
if (m_listpPacket.empty())
return NULL;
RTMPPacket* pPacket = m_listpPacket.front();
m_listpPacket.pop_front();
return pPacket;
}
// 向连接拷贝多个报文
void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
{
m_listpPacket.push_back(*iter);
sem_post(&m_sem);
}
}
};
// 客户端连接管理 =>
class CConnections
{
uint32_t m_uNextConnID;
std::map<uint32_t, CConnection*> m_mapConnection;
CMutex m_mutex;
public:
CConnections()
: m_uNextConnID(1)
{
}
virtual ~CConnections() {}
CConnection* createConnection(int nSocket)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
m_mapConnection[pConnection->ConnID()] = pConnection;
return pConnection;
}
void releaseConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConn = __getConnection(uConnID);
if (pConn)
{
m_mapConnection.erase(uConnID);
delete pConn;
}
}
CConnection* getConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
return __getConnection(uConnID);
}
private:
CConnection* __getConnection(uint32_t uConnID)
{
auto iter = m_mapConnection.find(uConnID);
if (iter == m_mapConnection.end())
return NULL;
return iter->second;
}
};
CConnections g_Conns;
// 节目容器 =>
class CPlayPath
{
std::string m_strPlayPath;
bool m_bEOF;
uint32_t m_uPublishConnID;
std::set<uint32_t> m_setPlayConnID;
CMutex m_mutex;
public:
CPlayPath(const std::string& strPlayPath)
: m_strPlayPath(strPlayPath)
, m_uPublishConnID(0)
, m_bEOF(true)
{
}
virtual ~CPlayPath() {}
const std::string& getName() const
{
return m_strPlayPath;
}
// 设置/获取 结束标志
void setEOF()
{
m_bEOF = true;
}
bool isEOF()
{
return m_bEOF;
}
// 重置节目对象
void reset(bool bCleanPlayer = false)
{
// 清除结束标志
m_bEOF = false;
uint32_t uPublishConnID = 0;
std::set<uint32_t> setPlayConnID;
// 清除推流和拉流连接
{
CAutoLock<CMutex> lock(&m_mutex);
uPublishConnID = m_uPublishConnID;
m_uPublishConnID = 0;
if (bCleanPlayer)
{
m_setPlayConnID.swap(setPlayConnID);
}
}
// 通知推流连接做清除处理
if (uPublishConnID > 0)
{
CConnection* pConn = g_Conns.getConnection(uPublishConnID);
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
// 通知拉流连接做清除处理
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
}
// 登记推流连接
void setPublishConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = uConnID;
}
// 取消登记推流连接
bool unsetPublishConn()
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = 0;
}
// 登记拉流连接
bool addPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter != m_setPlayConnID.end())
return false;
m_setPlayConnID.insert(uConnID);
return true;
}
// 取消登记拉流连接
bool delPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter == m_setPlayConnID.end())
return false;
m_setPlayConnID.erase(uConnID);
return true;
}
// 暂存媒体报文
void cacheMediaPacket(RTMPPacket* pPacket)
{
std::set<uint32_t> setPlayConnID;
{
CAutoLock<CMutex> lock(&m_mutex);
setPlayConnID = m_setPlayConnID;
}
// 简单起见,直接拷贝到拉流连接
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn == NULL)
continue;
std::vector<RTMPPacket*> vecpPacket;
RTMPPacket* pPacketCP = new RTMPPacket;
RTMPPacket_Reset(pPacketCP);
memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;
vecpPacket.push_back(pPacketCP);
pConn->copyPackets(m_strPlayPath, vecpPacket);
}
}
};
// 应用容器 =>
class CApp
{
std::string m_strApp;
std::map<std::string, CPlayPath*> m_mappPlayPath;
CMutex m_mutex;
public:
CApp(const std::string& strApp) : m_strApp(strApp) {}
virtual ~CApp() {}
const std::string& getName() const
{
return m_strApp;
}
CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mappPlayPath.find(strPlayPath);
if (iter != m_mappPlayPath.end())
return iter->second;
if (bCreate)
{
CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
m_mappPlayPath[strPlayPath] = pPlayPath;
return pPlayPath;
}
return NULL;
}
};
// 应用集合管理 =>
class CApps
{
std::map<std::string, CApp*> m_mapApp;
CMutex m_mutex;
public:
CApps() {}
virtual ~CApps() {}
CApp* getApp(const std::string& strApp, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapApp.find(strApp);
if (iter != m_mapApp.end())
return iter->second;
if (bCreate)
{
CApp* pApp = new CApp(strApp);
m_mapApp[strApp] = pApp;
return pApp;
}
return NULL;
}
};
CApps g_Apps;
// 程序逻辑 =>
void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);
int main(int argc, char* argv[])
{
if (argc < 3) {
printf("usage:%s peer_id peer_key\n", argv[0]);
return -1;
}
RTMP_LogSetLevel(RTMP_LOGDEBUG);
kkp2p_engine_conf_t kkp2p_conf;
kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
kkp2p_conf.max_log_size = 1024*1024*10;
kkp2p_conf.log_path = NULL;
g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
kkp2p_switch_log_level(g_engine, 4);
kkp2p_join_lan(g_engine, argv[1]);
kkp2p_join_net(g_engine, argv[1], argv[2]);
kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
while(1) {
int ret = kkp2p_accept(g_engine, 1000, channel);
if (ret < 0) {
// error
printf("kkp2p_accept error,exit\n");
free(channel);
break;
} else if (ret == 0) {
// timeout
continue;
} else {
// success
pthread_t ThreadId;
printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
}
}
return 0;
}
void* ClientThread(void* param)
{
pthread_detach(pthread_self());
kkp2p_channel_t* channel = (kkp2p_channel_t*)param;
// use default block
int val = fcntl(channel->fd, F_GETFL, 0);
fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
g_Conns.createConnection(channel->fd);
CConnection* pConn = g_Conns.createConnection(channel->fd);
printf("connection:[%d] coming... \n", pConn->ConnID());
// 握手
bool b = MyHandshake(pConn->Socket());
if (!b)
{
printf("connection:[%d] handshake failed! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
return NULL;
}
while (true)
{
RTMPPacket packet;
packet.m_body = NULL;
packet.m_chunk = NULL;
RTMPPacket_Reset(&packet);
// 读取报文
if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
{
printf("connection:[%d] read error! \n", pConn->ConnID());
break;
}
if (!RTMPPacket_IsReady(&packet))
continue;
//printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
// pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);
// 报文分派交互
bool b = Dispatch(pConn, &packet);
RTMPPacket_Free(&packet);
if (!b)
{
printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
break;
}
if (pConn->getStreamType() == CConnection::Play)
{
printf("connection:[%d] now play... \n", pConn->ConnID());
break;
}
}
// 进入拉流状态
struct timeval tv;
gettimeofday(&tv, NULL);
double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
while (pConn->getStreamType() == CConnection::Play)
{
RTMPPacket* pPacket = pConn->popPacket();
struct timeval tvNow;
gettimeofday(&tvNow, NULL);
double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;
// 超时检查
if (pPacket == NULL)
{
if (fNowReadTime - fLastReadTime < 30)
continue;
printf("connection:[%d] too time no packet \n", pConn->ConnID());
break;
}
fLastReadTime = fNowReadTime;
// 下发媒体报文
bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);
RTMPPacket_Free(pPacket);
delete pPacket;
if (!b)
{
printf("connection:[%d] send failed! \n", pConn->ConnID());
break;
}
}
// 连接退出前关系解除
switch (pConn->getStreamType())
{
case CConnection::Publish:
{
std::vector<std::string> vecPlayPath;
pConn->getPublishPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->setEOF();
pPlayPath->unsetPublishConn();
}
}
pConn->cleanPublishPlayPath();
}
break;
case CConnection::Play:
{
std::vector<std::string> vecPlayPath;
pConn->getPlayPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->delPlayConn(pConn->ConnID());
}
}
pConn->cleanPlayPlayPath();
}
break;
}
printf("connection:[%d] exit! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(g_engine, channel->channel_id);
free(channel);
return NULL;
}
int send_data(int fd, char* buff, int len) {
int sended = 0 ;
while (sended < len) {
int wl = send(fd, buff + sended, len - sended, 0);
if (wl < 0) {
printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
return -1;
}
sended += wl;
}
return len;
}
int recv_data(int fd, char* buff, int len) {
int recved = 0 ;
while (recved < len) {
int wl = recv(fd, buff + recved, len - recved, 0);
if (wl < 0) {
printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
return -1;
}
recved += wl;
}
return len;
}
// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
char type = 0;
if (recv_data(nSocket, (char*)&type, 1) != 1) {
return false;
}
if (type != 3) {
return false;
}
char sClientSIG[RTMP_SIG_SIZE] = {0};
if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
sServerSIG[0] = 3;
if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
return false;
}
if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
return true;
}
// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
switch (pPacket->m_packetType)
{
case 0x01:
{
if (pPacket->m_nBodySize >= 4)
{
pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
}
}
break;
case 0x04:
{
}
break;
case 0x05:
{
if (pPacket->m_nBodySize >= 4)
{
int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
}
}
break;
case 0x06:
{
if (pPacket->m_nBodySize >= 4)
{
int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
}
if (pPacket->m_nBodySize >= 5)
{
int nOutputBW2 = pPacket->m_body[4];
printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
}
}
break;
case 0x08:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x09:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x12:
{
}
break;
case 0x14:
{
if (HandleInvoke(pConn, pPacket) < 0)
return false;
}
break;
}
return true;
}
#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);
AVal makeAVal(const char* pStr)
{
return {(char*)pStr, (int)strlen(pStr)};
}
// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket)
{
if (pPacket->m_body[0] != 0x02)
{
printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
return -1;
}
uint32_t nInputStreamID = pPacket->m_nInfoField2;
AMFObject obj;
int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
if (nSize < 0)
{
printf("connection:[%d] invalid packet! \n", pConn->ConnID());
return -1;
}
AVal method;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);
if (AVMATCH(&method, &av_connect))
{
AMFObject obj1;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);
AVal appName = makeAVal("app");
AVal app;
AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);
std::string strApp(app.av_val, app.av_len);
printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());
pConn->setAppName(strApp);
if (!sendWindowAckSize(pConn))
return -1;
if (!sendPeerOutputBandWide(pConn))
return -1;
if (!sendOutputChunkSize(pConn))
return -1;
if (!sendConnectResult(pConn, nOperateID))
return -1;
}
else if (AVMATCH(&method, &av_releaseStream))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 检查该节目是否推流结束
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
if (!pPlayPath->isEOF())
{
if (!sendPublishError(pConn, nInputStreamID))
return -1;
return 0;
}
// 重置节目
pPlayPath->reset(false);
}
else if (AVMATCH(&method, &av_FCPublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 安全起见,初使化节目
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
}
else if (AVMATCH(&method, &av_createStream))
{
// 生成流ID
uint32_t uStreamID = pConn->genStreamID();
printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);
if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
return -1;
}
else if (AVMATCH(&method, &av_publish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Publish);
pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());
if (!sendPublishStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_play))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Play);
pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());
if (!sendPlayStreamBegin(pConn, nInputStreamID))
return -1;
if (!sendPlayStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_FCUnpublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
}
else if (AVMATCH(&method, &av_deleteStream))
{
int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);
// 连接与节目 解除双向关联
std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPublishPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
}
strPlayPath = pConn->getPlayPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPlayPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
}
}
AMF_Reset(&obj);
return 0;
}
// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
uint32_t nInputStreamID = pPacket->m_nInfoField2;
const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);
return 0;
}
// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x05;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x06;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_body[4] = 2;
packet.m_nBodySize = 5;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
pConn->Rtmp()->m_outChunkSize = 4096;
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x01;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 4096);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
AMFObject obj1 = {0, NULL};
AMFObjectProperty fmsVer;
fmsVer.p_name = makeAVal("fmsVer");
fmsVer.p_type = AMF_STRING;
fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
AMF_AddProp(&obj1, &fmsVer);
AMFObjectProperty capabilities;
capabilities.p_name = makeAVal("capabilities");
capabilities.p_type = AMF_NUMBER;
capabilities.p_vu.p_number = 31;
AMF_AddProp(&obj1, &capabilities);
pEnc = AMF_Encode(&obj1, pEnc, pEnd);
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
*pEnc++ = AMF_NULL;
pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("error");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
AMF_AddProp(&obj2, &code);
AMFObjectProperty description;
description.p_name = makeAVal("description");
description.p_type = AMF_STRING;
description.p_vu.p_aval = makeAVal("Already publishing");
AMF_AddProp(&obj2, &description);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x04;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>
#include <librtmp/rtmp.h>
#include <librtmp/log.h>
#include "kkp2p_sdk.h"
kkp2p_engine_t* g_engine = NULL;
class CMutex
{
pthread_mutex_t m_mutex;
public:
CMutex()
{
pthread_mutex_init(&m_mutex, NULL);
}
~CMutex()
{
pthread_mutex_destroy(&m_mutex);
}
void lock()
{
pthread_mutex_lock(&m_mutex);
}
void unlock()
{
pthread_mutex_unlock(&m_mutex);
}
};
template<typename T>
class CAutoLock
{
T* m_pLock;
public:
CAutoLock(T* pLock) : m_pLock(pLock)
{
m_pLock->lock();
}
~CAutoLock()
{
m_pLock->unlock();
}
};
class CConnection
{
uint32_t m_uConnID;
uint32_t m_uNextStreamID;
RTMP* m_pRtmp;
std::string m_strApp;
int m_nStreamType;
std::set<uint32_t> m_setUsingStreamID;
std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;
std::list<RTMPPacket*> m_listpPacket;
CMutex m_mutex;
sem_t m_sem;
public:
// 流类型
enum EStreamType
{
Unkown = 0,
Publish,
Play
};
CConnection(uint32_t uConnID, int nSocket)
: m_uConnID(uConnID)
, m_uNextStreamID(1)
, m_pRtmp(NULL)
, m_strApp("")
, m_nStreamType(Unkown)
{
m_pRtmp = RTMP_Alloc();
RTMP_Init(m_pRtmp);
m_pRtmp->m_sb.sb_socket = nSocket;
sem_init(&m_sem, 0, 0);
}
virtual ~CConnection()
{
RTMP_Close(m_pRtmp);
RTMP_Free(m_pRtmp);
sem_destroy(&m_sem);
}
uint32_t ConnID()
{
return m_uConnID;
}
RTMP* Rtmp()
{
return m_pRtmp;
}
int Socket()
{
return m_pRtmp->m_sb.sb_socket;
}
void setAppName(const std::string& strApp)
{
m_strApp = strApp;
}
const std::string& getAppName() const
{
return m_strApp;
}
void setStreamType(EStreamType emType)
{
m_nStreamType = emType;
}
EStreamType getStreamType()
{
return (EStreamType)m_nStreamType;
}
uint32_t genStreamID()
{
uint32_t uStreamID = m_uNextStreamID++;
m_setUsingStreamID.insert(uStreamID);
return uStreamID;
}
// 检查流ID是否合法
bool isValidStreamID(uint32_t uStreamID)
{
return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
}
// 登记 推流ID与playpath 映射关系
void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 推流ID与playpath 映射关系
void unbindPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.erase(uStreamID);
}
// 获取 推流ID映射关系
std::string getPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPublishStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 推流的playpath列表
const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 推流ID与playpath 映射关系
void cleanPublishPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.clear();
}
// 登记 拉流ID与playpath 映射关系
void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 拉流ID与playpath 映射关系
void unbindPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.erase(uStreamID);
}
// 获取 拉流ID映射关系
std::string getPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPlayStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 拉流的playpath列表
const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 拉流ID与playpath 映射关系
void cleanPlayPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.clear();
}
// 通知指定的playpath即将重置
void tellResetPlayPath(const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
uint32_t uStreamID = 0;
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPublishStreamIDPlayPath.erase(iter);
break;
}
}
if (uStreamID == 0)
{
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPlayStreamIDPlayPath.erase(iter);
break;
}
}
}
}
// 提取待发送的报文
RTMPPacket* popPacket()
{
struct timeval tv;
gettimeofday(&tv, NULL);
double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
sem_timedwait(&m_sem, &ts);
CAutoLock<CMutex> lock(&m_mutex);
if (m_listpPacket.empty())
return NULL;
RTMPPacket* pPacket = m_listpPacket.front();
m_listpPacket.pop_front();
return pPacket;
}
// 向连接拷贝多个报文
void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
{
m_listpPacket.push_back(*iter);
sem_post(&m_sem);
}
}
};
// 客户端连接管理 =>
class CConnections
{
uint32_t m_uNextConnID;
std::map<uint32_t, CConnection*> m_mapConnection;
CMutex m_mutex;
public:
CConnections()
: m_uNextConnID(1)
{
}
virtual ~CConnections() {}
CConnection* createConnection(int nSocket)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
m_mapConnection[pConnection->ConnID()] = pConnection;
return pConnection;
}
void releaseConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConn = __getConnection(uConnID);
if (pConn)
{
m_mapConnection.erase(uConnID);
delete pConn;
}
}
CConnection* getConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
return __getConnection(uConnID);
}
private:
CConnection* __getConnection(uint32_t uConnID)
{
auto iter = m_mapConnection.find(uConnID);
if (iter == m_mapConnection.end())
return NULL;
return iter->second;
}
};
CConnections g_Conns;
// 节目容器 =>
class CPlayPath
{
std::string m_strPlayPath;
bool m_bEOF;
uint32_t m_uPublishConnID;
std::set<uint32_t> m_setPlayConnID;
CMutex m_mutex;
public:
CPlayPath(const std::string& strPlayPath)
: m_strPlayPath(strPlayPath)
, m_uPublishConnID(0)
, m_bEOF(true)
{
}
virtual ~CPlayPath() {}
const std::string& getName() const
{
return m_strPlayPath;
}
// 设置/获取 结束标志
void setEOF()
{
m_bEOF = true;
}
bool isEOF()
{
return m_bEOF;
}
// 重置节目对象
void reset(bool bCleanPlayer = false)
{
// 清除结束标志
m_bEOF = false;
uint32_t uPublishConnID = 0;
std::set<uint32_t> setPlayConnID;
// 清除推流和拉流连接
{
CAutoLock<CMutex> lock(&m_mutex);
uPublishConnID = m_uPublishConnID;
m_uPublishConnID = 0;
if (bCleanPlayer)
{
m_setPlayConnID.swap(setPlayConnID);
}
}
// 通知推流连接做清除处理
if (uPublishConnID > 0)
{
CConnection* pConn = g_Conns.getConnection(uPublishConnID);
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
// 通知拉流连接做清除处理
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
}
// 登记推流连接
void setPublishConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = uConnID;
}
// 取消登记推流连接
bool unsetPublishConn()
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = 0;
}
// 登记拉流连接
bool addPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter != m_setPlayConnID.end())
return false;
m_setPlayConnID.insert(uConnID);
return true;
}
// 取消登记拉流连接
bool delPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter == m_setPlayConnID.end())
return false;
m_setPlayConnID.erase(uConnID);
return true;
}
// 暂存媒体报文
void cacheMediaPacket(RTMPPacket* pPacket)
{
std::set<uint32_t> setPlayConnID;
{
CAutoLock<CMutex> lock(&m_mutex);
setPlayConnID = m_setPlayConnID;
}
// 简单起见,直接拷贝到拉流连接
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn == NULL)
continue;
std::vector<RTMPPacket*> vecpPacket;
RTMPPacket* pPacketCP = new RTMPPacket;
RTMPPacket_Reset(pPacketCP);
memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;
vecpPacket.push_back(pPacketCP);
pConn->copyPackets(m_strPlayPath, vecpPacket);
}
}
};
// 应用容器 =>
class CApp
{
std::string m_strApp;
std::map<std::string, CPlayPath*> m_mappPlayPath;
CMutex m_mutex;
public:
CApp(const std::string& strApp) : m_strApp(strApp) {}
virtual ~CApp() {}
const std::string& getName() const
{
return m_strApp;
}
CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mappPlayPath.find(strPlayPath);
if (iter != m_mappPlayPath.end())
return iter->second;
if (bCreate)
{
CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
m_mappPlayPath[strPlayPath] = pPlayPath;
return pPlayPath;
}
return NULL;
}
};
// 应用集合管理 =>
class CApps
{
std::map<std::string, CApp*> m_mapApp;
CMutex m_mutex;
public:
CApps() {}
virtual ~CApps() {}
CApp* getApp(const std::string& strApp, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapApp.find(strApp);
if (iter != m_mapApp.end())
return iter->second;
if (bCreate)
{
CApp* pApp = new CApp(strApp);
m_mapApp[strApp] = pApp;
return pApp;
}
return NULL;
}
};
CApps g_Apps;
// 程序逻辑 =>
void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);
int main(int argc, char* argv[])
{
if (argc < 3) {
printf("usage:%s peer_id peer_key\n", argv[0]);
return -1;
}
RTMP_LogSetLevel(RTMP_LOGDEBUG);
kkp2p_engine_conf_t kkp2p_conf;
kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
kkp2p_conf.max_log_size = 1024*1024*10;
kkp2p_conf.log_path = NULL;
g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
kkp2p_switch_log_level(g_engine, 4);
kkp2p_join_lan(g_engine, argv[1]);
kkp2p_join_net(g_engine, argv[1], argv[2]);
kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
while(1) {
int ret = kkp2p_accept(g_engine, 1000, channel);
if (ret < 0) {
// error
printf("kkp2p_accept error,exit\n");
free(channel);
break;
} else if (ret == 0) {
// timeout
continue;
} else {
// success
pthread_t ThreadId;
printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
}
}
return 0;
}
void* ClientThread(void* param)
{
pthread_detach(pthread_self());
kkp2p_channel_t* channel = (kkp2p_channel_t*)param;
// use default block
int val = fcntl(channel->fd, F_GETFL, 0);
fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
g_Conns.createConnection(channel->fd);
CConnection* pConn = g_Conns.createConnection(channel->fd);
printf("connection:[%d] coming... \n", pConn->ConnID());
// 握手
bool b = MyHandshake(pConn->Socket());
if (!b)
{
printf("connection:[%d] handshake failed! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
return NULL;
}
while (true)
{
RTMPPacket packet;
packet.m_body = NULL;
packet.m_chunk = NULL;
RTMPPacket_Reset(&packet);
// 读取报文
if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
{
printf("connection:[%d] read error! \n", pConn->ConnID());
break;
}
if (!RTMPPacket_IsReady(&packet))
continue;
//printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
// pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);
// 报文分派交互
bool b = Dispatch(pConn, &packet);
RTMPPacket_Free(&packet);
if (!b)
{
printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
break;
}
if (pConn->getStreamType() == CConnection::Play)
{
printf("connection:[%d] now play... \n", pConn->ConnID());
break;
}
}
// 进入拉流状态
struct timeval tv;
gettimeofday(&tv, NULL);
double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
while (pConn->getStreamType() == CConnection::Play)
{
RTMPPacket* pPacket = pConn->popPacket();
struct timeval tvNow;
gettimeofday(&tvNow, NULL);
double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;
// 超时检查
if (pPacket == NULL)
{
if (fNowReadTime - fLastReadTime < 30)
continue;
printf("connection:[%d] too time no packet \n", pConn->ConnID());
break;
}
fLastReadTime = fNowReadTime;
// 下发媒体报文
bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);
RTMPPacket_Free(pPacket);
delete pPacket;
if (!b)
{
printf("connection:[%d] send failed! \n", pConn->ConnID());
break;
}
}
// 连接退出前关系解除
switch (pConn->getStreamType())
{
case CConnection::Publish:
{
std::vector<std::string> vecPlayPath;
pConn->getPublishPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->setEOF();
pPlayPath->unsetPublishConn();
}
}
pConn->cleanPublishPlayPath();
}
break;
case CConnection::Play:
{
std::vector<std::string> vecPlayPath;
pConn->getPlayPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->delPlayConn(pConn->ConnID());
}
}
pConn->cleanPlayPlayPath();
}
break;
}
printf("connection:[%d] exit! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(g_engine, channel->channel_id);
free(channel);
return NULL;
}
int send_data(int fd, char* buff, int len) {
int sended = 0 ;
while (sended < len) {
int wl = send(fd, buff + sended, len - sended, 0);
if (wl < 0) {
printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
return -1;
}
sended += wl;
}
return len;
}
int recv_data(int fd, char* buff, int len) {
int recved = 0 ;
while (recved < len) {
int wl = recv(fd, buff + recved, len - recved, 0);
if (wl < 0) {
printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
return -1;
}
recved += wl;
}
return len;
}
// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
char type = 0;
if (recv_data(nSocket, (char*)&type, 1) != 1) {
return false;
}
if (type != 3) {
return false;
}
char sClientSIG[RTMP_SIG_SIZE] = {0};
if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
sServerSIG[0] = 3;
if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
return false;
}
if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
return true;
}
// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
switch (pPacket->m_packetType)
{
case 0x01:
{
if (pPacket->m_nBodySize >= 4)
{
pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
}
}
break;
case 0x04:
{
}
break;
case 0x05:
{
if (pPacket->m_nBodySize >= 4)
{
int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
}
}
break;
case 0x06:
{
if (pPacket->m_nBodySize >= 4)
{
int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
}
if (pPacket->m_nBodySize >= 5)
{
int nOutputBW2 = pPacket->m_body[4];
printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
}
}
break;
case 0x08:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x09:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x12:
{
}
break;
case 0x14:
{
if (HandleInvoke(pConn, pPacket) < 0)
return false;
}
break;
}
return true;
}
#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);
AVal makeAVal(const char* pStr)
{
return {(char*)pStr, (int)strlen(pStr)};
}
// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket)
{
if (pPacket->m_body[0] != 0x02)
{
printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
return -1;
}
uint32_t nInputStreamID = pPacket->m_nInfoField2;
AMFObject obj;
int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
if (nSize < 0)
{
printf("connection:[%d] invalid packet! \n", pConn->ConnID());
return -1;
}
AVal method;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);
if (AVMATCH(&method, &av_connect))
{
AMFObject obj1;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);
AVal appName = makeAVal("app");
AVal app;
AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);
std::string strApp(app.av_val, app.av_len);
printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());
pConn->setAppName(strApp);
if (!sendWindowAckSize(pConn))
return -1;
if (!sendPeerOutputBandWide(pConn))
return -1;
if (!sendOutputChunkSize(pConn))
return -1;
if (!sendConnectResult(pConn, nOperateID))
return -1;
}
else if (AVMATCH(&method, &av_releaseStream))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 检查该节目是否推流结束
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
if (!pPlayPath->isEOF())
{
if (!sendPublishError(pConn, nInputStreamID))
return -1;
return 0;
}
// 重置节目
pPlayPath->reset(false);
}
else if (AVMATCH(&method, &av_FCPublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 安全起见,初使化节目
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
}
else if (AVMATCH(&method, &av_createStream))
{
// 生成流ID
uint32_t uStreamID = pConn->genStreamID();
printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);
if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
return -1;
}
else if (AVMATCH(&method, &av_publish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Publish);
pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());
if (!sendPublishStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_play))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Play);
pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());
if (!sendPlayStreamBegin(pConn, nInputStreamID))
return -1;
if (!sendPlayStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_FCUnpublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
}
else if (AVMATCH(&method, &av_deleteStream))
{
int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);
// 连接与节目 解除双向关联
std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPublishPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
}
strPlayPath = pConn->getPlayPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPlayPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
}
}
AMF_Reset(&obj);
return 0;
}
// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
uint32_t nInputStreamID = pPacket->m_nInfoField2;
const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);
return 0;
}
// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x05;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x06;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_body[4] = 2;
packet.m_nBodySize = 5;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
pConn->Rtmp()->m_outChunkSize = 4096;
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x01;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 4096);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
AMFObject obj1 = {0, NULL};
AMFObjectProperty fmsVer;
fmsVer.p_name = makeAVal("fmsVer");
fmsVer.p_type = AMF_STRING;
fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
AMF_AddProp(&obj1, &fmsVer);
AMFObjectProperty capabilities;
capabilities.p_name = makeAVal("capabilities");
capabilities.p_type = AMF_NUMBER;
capabilities.p_vu.p_number = 31;
AMF_AddProp(&obj1, &capabilities);
pEnc = AMF_Encode(&obj1, pEnc, pEnd);
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
*pEnc++ = AMF_NULL;
pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("error");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
AMF_AddProp(&obj2, &code);
AMFObjectProperty description;
description.p_name = makeAVal("description");
description.p_type = AMF_STRING;
description.p_vu.p_aval = makeAVal("Already publishing");
AMF_AddProp(&obj2, &description);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x04;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>
#include <librtmp/rtmp.h>
#include <librtmp/log.h>
#include "kkp2p_sdk.h"
kkp2p_engine_t* g_engine = NULL;
class CMutex
{
pthread_mutex_t m_mutex;
public:
CMutex()
{
pthread_mutex_init(&m_mutex, NULL);
}
~CMutex()
{
pthread_mutex_destroy(&m_mutex);
}
void lock()
{
pthread_mutex_lock(&m_mutex);
}
void unlock()
{
pthread_mutex_unlock(&m_mutex);
}
};
template<typename T>
class CAutoLock
{
T* m_pLock;
public:
CAutoLock(T* pLock) : m_pLock(pLock)
{
m_pLock->lock();
}
~CAutoLock()
{
m_pLock->unlock();
}
};
class CConnection
{
uint32_t m_uConnID;
uint32_t m_uNextStreamID;
RTMP* m_pRtmp;
std::string m_strApp;
int m_nStreamType;
std::set<uint32_t> m_setUsingStreamID;
std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;
std::list<RTMPPacket*> m_listpPacket;
CMutex m_mutex;
sem_t m_sem;
public:
// 流类型
enum EStreamType
{
Unkown = 0,
Publish,
Play
};
CConnection(uint32_t uConnID, int nSocket)
: m_uConnID(uConnID)
, m_uNextStreamID(1)
, m_pRtmp(NULL)
, m_strApp("")
, m_nStreamType(Unkown)
{
m_pRtmp = RTMP_Alloc();
RTMP_Init(m_pRtmp);
m_pRtmp->m_sb.sb_socket = nSocket;
sem_init(&m_sem, 0, 0);
}
virtual ~CConnection()
{
RTMP_Close(m_pRtmp);
RTMP_Free(m_pRtmp);
sem_destroy(&m_sem);
}
uint32_t ConnID()
{
return m_uConnID;
}
RTMP* Rtmp()
{
return m_pRtmp;
}
int Socket()
{
return m_pRtmp->m_sb.sb_socket;
}
void setAppName(const std::string& strApp)
{
m_strApp = strApp;
}
const std::string& getAppName() const
{
return m_strApp;
}
void setStreamType(EStreamType emType)
{
m_nStreamType = emType;
}
EStreamType getStreamType()
{
return (EStreamType)m_nStreamType;
}
uint32_t genStreamID()
{
uint32_t uStreamID = m_uNextStreamID++;
m_setUsingStreamID.insert(uStreamID);
return uStreamID;
}
// 检查流ID是否合法
bool isValidStreamID(uint32_t uStreamID)
{
return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
}
// 登记 推流ID与playpath 映射关系
void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 推流ID与playpath 映射关系
void unbindPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.erase(uStreamID);
}
// 获取 推流ID映射关系
std::string getPublishPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPublishStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 推流的playpath列表
const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 推流ID与playpath 映射关系
void cleanPublishPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPublishStreamIDPlayPath.clear();
}
// 登记 拉流ID与playpath 映射关系
void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
m_setUsingStreamID.erase(uStreamID);
m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
}
// 取消登记 拉流ID与playpath 映射关系
void unbindPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.erase(uStreamID);
}
// 获取 拉流ID映射关系
std::string getPlayPlayPath(uint32_t uStreamID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
if (iter == m_mapPlayStreamIDPlayPath.end())
return "";
return iter->second;
}
// 断连时获取 拉流的playpath列表
const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
vecPlayPath.push_back(iter->second);
}
}
// 断连时清除 拉流ID与playpath 映射关系
void cleanPlayPlayPath()
{
CAutoLock<CMutex> lock(&m_mutex);
m_mapPlayStreamIDPlayPath.clear();
}
// 通知指定的playpath即将重置
void tellResetPlayPath(const std::string& strPlayPath)
{
CAutoLock<CMutex> lock(&m_mutex);
uint32_t uStreamID = 0;
for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPublishStreamIDPlayPath.erase(iter);
break;
}
}
if (uStreamID == 0)
{
for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
{
if (iter->second == strPlayPath)
{
uStreamID = iter->first;
m_mapPlayStreamIDPlayPath.erase(iter);
break;
}
}
}
}
// 提取待发送的报文
RTMPPacket* popPacket()
{
struct timeval tv;
gettimeofday(&tv, NULL);
double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
sem_timedwait(&m_sem, &ts);
CAutoLock<CMutex> lock(&m_mutex);
if (m_listpPacket.empty())
return NULL;
RTMPPacket* pPacket = m_listpPacket.front();
m_listpPacket.pop_front();
return pPacket;
}
// 向连接拷贝多个报文
void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
{
CAutoLock<CMutex> lock(&m_mutex);
for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
{
m_listpPacket.push_back(*iter);
sem_post(&m_sem);
}
}
};
// 客户端连接管理 =>
class CConnections
{
uint32_t m_uNextConnID;
std::map<uint32_t, CConnection*> m_mapConnection;
CMutex m_mutex;
public:
CConnections()
: m_uNextConnID(1)
{
}
virtual ~CConnections() {}
CConnection* createConnection(int nSocket)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
m_mapConnection[pConnection->ConnID()] = pConnection;
return pConnection;
}
void releaseConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
CConnection* pConn = __getConnection(uConnID);
if (pConn)
{
m_mapConnection.erase(uConnID);
delete pConn;
}
}
CConnection* getConnection(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
return __getConnection(uConnID);
}
private:
CConnection* __getConnection(uint32_t uConnID)
{
auto iter = m_mapConnection.find(uConnID);
if (iter == m_mapConnection.end())
return NULL;
return iter->second;
}
};
CConnections g_Conns;
// 节目容器 =>
class CPlayPath
{
std::string m_strPlayPath;
bool m_bEOF;
uint32_t m_uPublishConnID;
std::set<uint32_t> m_setPlayConnID;
CMutex m_mutex;
public:
CPlayPath(const std::string& strPlayPath)
: m_strPlayPath(strPlayPath)
, m_uPublishConnID(0)
, m_bEOF(true)
{
}
virtual ~CPlayPath() {}
const std::string& getName() const
{
return m_strPlayPath;
}
// 设置/获取 结束标志
void setEOF()
{
m_bEOF = true;
}
bool isEOF()
{
return m_bEOF;
}
// 重置节目对象
void reset(bool bCleanPlayer = false)
{
// 清除结束标志
m_bEOF = false;
uint32_t uPublishConnID = 0;
std::set<uint32_t> setPlayConnID;
// 清除推流和拉流连接
{
CAutoLock<CMutex> lock(&m_mutex);
uPublishConnID = m_uPublishConnID;
m_uPublishConnID = 0;
if (bCleanPlayer)
{
m_setPlayConnID.swap(setPlayConnID);
}
}
// 通知推流连接做清除处理
if (uPublishConnID > 0)
{
CConnection* pConn = g_Conns.getConnection(uPublishConnID);
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
// 通知拉流连接做清除处理
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn)
{
pConn->tellResetPlayPath(m_strPlayPath);
}
}
}
// 登记推流连接
void setPublishConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = uConnID;
}
// 取消登记推流连接
bool unsetPublishConn()
{
CAutoLock<CMutex> lock(&m_mutex);
m_uPublishConnID = 0;
}
// 登记拉流连接
bool addPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter != m_setPlayConnID.end())
return false;
m_setPlayConnID.insert(uConnID);
return true;
}
// 取消登记拉流连接
bool delPlayConn(uint32_t uConnID)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_setPlayConnID.find(uConnID);
if (iter == m_setPlayConnID.end())
return false;
m_setPlayConnID.erase(uConnID);
return true;
}
// 暂存媒体报文
void cacheMediaPacket(RTMPPacket* pPacket)
{
std::set<uint32_t> setPlayConnID;
{
CAutoLock<CMutex> lock(&m_mutex);
setPlayConnID = m_setPlayConnID;
}
// 简单起见,直接拷贝到拉流连接
for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
{
CConnection* pConn = g_Conns.getConnection( (*iter) );
if (pConn == NULL)
continue;
std::vector<RTMPPacket*> vecpPacket;
RTMPPacket* pPacketCP = new RTMPPacket;
RTMPPacket_Reset(pPacketCP);
memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;
vecpPacket.push_back(pPacketCP);
pConn->copyPackets(m_strPlayPath, vecpPacket);
}
}
};
// 应用容器 =>
class CApp
{
std::string m_strApp;
std::map<std::string, CPlayPath*> m_mappPlayPath;
CMutex m_mutex;
public:
CApp(const std::string& strApp) : m_strApp(strApp) {}
virtual ~CApp() {}
const std::string& getName() const
{
return m_strApp;
}
CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mappPlayPath.find(strPlayPath);
if (iter != m_mappPlayPath.end())
return iter->second;
if (bCreate)
{
CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
m_mappPlayPath[strPlayPath] = pPlayPath;
return pPlayPath;
}
return NULL;
}
};
// 应用集合管理 =>
class CApps
{
std::map<std::string, CApp*> m_mapApp;
CMutex m_mutex;
public:
CApps() {}
virtual ~CApps() {}
CApp* getApp(const std::string& strApp, bool bCreate = true)
{
CAutoLock<CMutex> lock(&m_mutex);
auto iter = m_mapApp.find(strApp);
if (iter != m_mapApp.end())
return iter->second;
if (bCreate)
{
CApp* pApp = new CApp(strApp);
m_mapApp[strApp] = pApp;
return pApp;
}
return NULL;
}
};
CApps g_Apps;
// 程序逻辑 =>
void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);
int main(int argc, char* argv[])
{
if (argc < 3) {
printf("usage:%s peer_id peer_key\n", argv[0]);
return -1;
}
RTMP_LogSetLevel(RTMP_LOGDEBUG);
kkp2p_engine_conf_t kkp2p_conf;
kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
kkp2p_conf.max_log_size = 1024*1024*10;
kkp2p_conf.log_path = NULL;
g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
kkp2p_switch_log_level(g_engine, 4);
kkp2p_join_lan(g_engine, argv[1]);
kkp2p_join_net(g_engine, argv[1], argv[2]);
kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
while(1) {
int ret = kkp2p_accept(g_engine, 1000, channel);
if (ret < 0) {
// error
printf("kkp2p_accept error,exit\n");
free(channel);
break;
} else if (ret == 0) {
// timeout
continue;
} else {
// success
pthread_t ThreadId;
printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
}
}
return 0;
}
void* ClientThread(void* param)
{
pthread_detach(pthread_self());
kkp2p_channel_t* channel = (kkp2p_channel_t*)param;
// use default block
int val = fcntl(channel->fd, F_GETFL, 0);
fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
g_Conns.createConnection(channel->fd);
CConnection* pConn = g_Conns.createConnection(channel->fd);
printf("connection:[%d] coming... \n", pConn->ConnID());
// 握手
bool b = MyHandshake(pConn->Socket());
if (!b)
{
printf("connection:[%d] handshake failed! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
return NULL;
}
while (true)
{
RTMPPacket packet;
packet.m_body = NULL;
packet.m_chunk = NULL;
RTMPPacket_Reset(&packet);
// 读取报文
if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
{
printf("connection:[%d] read error! \n", pConn->ConnID());
break;
}
if (!RTMPPacket_IsReady(&packet))
continue;
//printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
// pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);
// 报文分派交互
bool b = Dispatch(pConn, &packet);
RTMPPacket_Free(&packet);
if (!b)
{
printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
break;
}
if (pConn->getStreamType() == CConnection::Play)
{
printf("connection:[%d] now play... \n", pConn->ConnID());
break;
}
}
// 进入拉流状态
struct timeval tv;
gettimeofday(&tv, NULL);
double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
while (pConn->getStreamType() == CConnection::Play)
{
RTMPPacket* pPacket = pConn->popPacket();
struct timeval tvNow;
gettimeofday(&tvNow, NULL);
double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;
// 超时检查
if (pPacket == NULL)
{
if (fNowReadTime - fLastReadTime < 30)
continue;
printf("connection:[%d] too time no packet \n", pConn->ConnID());
break;
}
fLastReadTime = fNowReadTime;
// 下发媒体报文
bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);
RTMPPacket_Free(pPacket);
delete pPacket;
if (!b)
{
printf("connection:[%d] send failed! \n", pConn->ConnID());
break;
}
}
// 连接退出前关系解除
switch (pConn->getStreamType())
{
case CConnection::Publish:
{
std::vector<std::string> vecPlayPath;
pConn->getPublishPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->setEOF();
pPlayPath->unsetPublishConn();
}
}
pConn->cleanPublishPlayPath();
}
break;
case CConnection::Play:
{
std::vector<std::string> vecPlayPath;
pConn->getPlayPlayPaths(vecPlayPath);
for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
{
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
if (pPlayPath)
{
pPlayPath->delPlayConn(pConn->ConnID());
}
}
pConn->cleanPlayPlayPath();
}
break;
}
printf("connection:[%d] exit! \n", pConn->ConnID());
g_Conns.releaseConnection(pConn->ConnID());
kkp2p_close_fd(channel->fd);
kkp2p_close_channel(g_engine, channel->channel_id);
free(channel);
return NULL;
}
int send_data(int fd, char* buff, int len) {
int sended = 0 ;
while (sended < len) {
int wl = send(fd, buff + sended, len - sended, 0);
if (wl < 0) {
printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
return -1;
}
sended += wl;
}
return len;
}
int recv_data(int fd, char* buff, int len) {
int recved = 0 ;
while (recved < len) {
int wl = recv(fd, buff + recved, len - recved, 0);
if (wl < 0) {
printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
return -1;
}
recved += wl;
}
return len;
}
// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
char type = 0;
if (recv_data(nSocket, (char*)&type, 1) != 1) {
return false;
}
if (type != 3) {
return false;
}
char sClientSIG[RTMP_SIG_SIZE] = {0};
if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
sServerSIG[0] = 3;
if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
return false;
}
if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
return false;
}
return true;
}
// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
switch (pPacket->m_packetType)
{
case 0x01:
{
if (pPacket->m_nBodySize >= 4)
{
pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
}
}
break;
case 0x04:
{
}
break;
case 0x05:
{
if (pPacket->m_nBodySize >= 4)
{
int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
}
}
break;
case 0x06:
{
if (pPacket->m_nBodySize >= 4)
{
int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
}
if (pPacket->m_nBodySize >= 5)
{
int nOutputBW2 = pPacket->m_body[4];
printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
}
}
break;
case 0x08:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x09:
{
HandleMediaPacket(pConn, pPacket);
}
break;
case 0x12:
{
}
break;
case 0x14:
{
if (HandleInvoke(pConn, pPacket) < 0)
return false;
}
break;
}
return true;
}
#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);
AVal makeAVal(const char* pStr)
{
return {(char*)pStr, (int)strlen(pStr)};
}
// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket)
{
if (pPacket->m_body[0] != 0x02)
{
printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
return -1;
}
uint32_t nInputStreamID = pPacket->m_nInfoField2;
AMFObject obj;
int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
if (nSize < 0)
{
printf("connection:[%d] invalid packet! \n", pConn->ConnID());
return -1;
}
AVal method;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);
if (AVMATCH(&method, &av_connect))
{
AMFObject obj1;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);
AVal appName = makeAVal("app");
AVal app;
AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);
std::string strApp(app.av_val, app.av_len);
printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());
pConn->setAppName(strApp);
if (!sendWindowAckSize(pConn))
return -1;
if (!sendPeerOutputBandWide(pConn))
return -1;
if (!sendOutputChunkSize(pConn))
return -1;
if (!sendConnectResult(pConn, nOperateID))
return -1;
}
else if (AVMATCH(&method, &av_releaseStream))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 检查该节目是否推流结束
CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
if (!pPlayPath->isEOF())
{
if (!sendPublishError(pConn, nInputStreamID))
return -1;
return 0;
}
// 重置节目
pPlayPath->reset(false);
}
else if (AVMATCH(&method, &av_FCPublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
// 安全起见,初使化节目
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
}
else if (AVMATCH(&method, &av_createStream))
{
// 生成流ID
uint32_t uStreamID = pConn->genStreamID();
printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);
if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
return -1;
}
else if (AVMATCH(&method, &av_publish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Publish);
pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());
if (!sendPublishStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_play))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);
// 检查streamID的有效性
if (!pConn->isValidStreamID(nInputStreamID))
{
printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
return -1;
}
// 连接与节目 建立双向关联
pConn->setStreamType(CConnection::Play);
pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());
if (!sendPlayStreamBegin(pConn, nInputStreamID))
return -1;
if (!sendPlayStatus(pConn, nInputStreamID))
return -1;
}
else if (AVMATCH(&method, &av_FCUnpublish))
{
AVal playpath;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
std::string strPlayPath(playpath.av_val, playpath.av_len);
printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
}
else if (AVMATCH(&method, &av_deleteStream))
{
int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);
// 连接与节目 解除双向关联
std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPublishPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
}
strPlayPath = pConn->getPlayPlayPath(nStreamID);
if (strPlayPath != "")
{
pConn->unbindPlayPlayPath(nStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
}
}
AMF_Reset(&obj);
return 0;
}
// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
uint32_t nInputStreamID = pPacket->m_nInfoField2;
const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);
return 0;
}
// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x05;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x06;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
packet.m_body[4] = 2;
packet.m_nBodySize = 5;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
pConn->Rtmp()->m_outChunkSize = 4096;
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x01;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
AMF_EncodeInt32(packet.m_body, pEnd, 4096);
packet.m_nBodySize = 4;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
AMFObject obj1 = {0, NULL};
AMFObjectProperty fmsVer;
fmsVer.p_name = makeAVal("fmsVer");
fmsVer.p_type = AMF_STRING;
fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
AMF_AddProp(&obj1, &fmsVer);
AMFObjectProperty capabilities;
capabilities.p_name = makeAVal("capabilities");
capabilities.p_type = AMF_NUMBER;
capabilities.p_vu.p_number = 31;
AMF_AddProp(&obj1, &capabilities);
pEnc = AMF_Encode(&obj1, pEnc, pEnd);
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x03;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
*pEnc++ = AMF_NULL;
pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("error");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
AMF_AddProp(&obj2, &code);
AMFObjectProperty description;
description.p_name = makeAVal("description");
description.p_type = AMF_STRING;
description.p_vu.p_aval = makeAVal("Already publishing");
AMF_AddProp(&obj2, &description);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x02;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x04;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = 0;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
return false;
}
return true;
}
// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
char sBuf[256] = {0};
char* pEnd = sBuf + sizeof(sBuf);
RTMPPacket packet;
packet.m_nChannel = 0x05;
packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
packet.m_packetType = 0x14;
packet.m_nTimeStamp = 0;
packet.m_nInfoField2 = nInputStreamID;
packet.m_hasAbsTimestamp = 0;
packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;
char* pEnc = packet.m_body;
pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
*pEnc++ = AMF_NULL;
AMFObject obj2 = {0, NULL};
AMFObjectProperty level;
level.p_name = makeAVal("level");
level.p_type = AMF_STRING;
level.p_vu.p_aval = makeAVal("status");
AMF_AddProp(&obj2, &level);
AMFObjectProperty code;
code.p_name = makeAVal("code");
code.p_type = AMF_STRING;
code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
AMF_AddProp(&obj2, &code);
pEnc = AMF_Encode(&obj2, pEnc, pEnd);
packet.m_nBodySize = pEnc - packet.m_body;
if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
{
printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
return false;
}
return true;
}