最佳实践-p2p传输rtmp协议

     库快科技p2p库是一个业务通用的p2p通信库,只是负责创建连接,至于在连接上业务层使用的协议,不做任何限制,p2p库只是对数据进行透传(relay模式或者p2p模式),所以基于库快科技的p2p通信库,用户可以使用任何协议,包括https、rtmp等。

    本例子用于举例说明,如何基于kkp2p使用rtmp协议。众所周知,rtmp传输协议广泛用于直播点播系统中,主要用于传输音视频数据。

    本例子分两个程序配合演示,一个作为rtmp server的接入代理(peer client);一个作为服务端(rtmp server)。最后演示时候用ffmpeg推流,用vlc播放器播放。测试流程描述如下:


首先我们看peer client代码,该代码在windows平台下用visual studio编译测试通过。peer client利用接口kkp2p_start_proxy启动一个本地代理和rtmp server进行通信。源码讲解如下:

#include <windows.h>
#include <process.h>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/timeb.h>
#include <signal.h>

#include "kkp2p_sdk.h"

#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib, "iphlpapi.lib")

// CTRL+c退出程序
char run_flag = 1;
void SignalHandler(int signal)
{
	printf("exit...\n");
	run_flag = 0;
}

// 总共5个参数,为代理ip、port、对端通信的peer id,代理和peer id建联模式
int main(int argc, char** argv) {
	if (argc < 5) {
		printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]);
		return -1;
	}

	typedef void(*SignalHandlerPointer)(int);
	SignalHandlerPointer previousHandler;
	previousHandler = signal(SIGINT, SignalHandler);

	WSADATA wsadata;//注释2
	WSAStartup(MAKEWORD(2, 2), &wsadata);

	// 设置p2p服务端的登录域名和端口
	kkp2p_engine_conf_t kkp2p_conf;
	kkp2p_conf.login_domain = (char*)"124.71.217.198";
	kkp2p_conf.login_port = 3080;
	kkp2p_conf.lan_search_port = 3549;
	kkp2p_conf.max_log_size = 1024 * 1024 * 10;
	kkp2p_conf.log_path = NULL;
	kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
	kkp2p_switch_log_level(g_engine, 4);

	kkp2p_connect_ctx_t ctx;
	memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t));
	strncpy(ctx.peer_id, argv[3], 32);
	ctx.timeout = 5000;

        // 创建tcp类型传输通道
        ctx.channel_type = KKP2P_TCP_CHANNEL;
	
	// 和peer id建连模式,0为自动模式,1为仅p2p模式,2为仅relay模式
	ctx.connect_mode = atoi(argv[4]);
	uint32_t proxyId = 0;
	int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId);
	if (ret < 0) {
		printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]);
		return -1;
	}
	else {
		printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]);
	}

	while (run_flag) {
	    Sleep(1000);
	}

	kkp2p_stop_proxy(g_engine, proxyId);
	kkp2p_engine_destroy(g_engine);
	return 0;
}

peer server(rtmp server)端模式流程简介如下,主线程不断调用kkp2p_accept接入新的连接,如果有新的连接过来,会得到一个句柄fd,然后在该fd句柄上处理rtmp协议即可。服务端代码来自于互联网,在原有代码上做了p2p的使用适配,我们放在最后讲解,该服务端代码在linux平台行测试通过。由于仅是演示作用,有些细节考虑不完善,仅做参考使用。

我们现在看测试过程,首先再windows下启动peer client,即rtmp proxy代理, 地址为127.0.0.1:32915

然后再linux平台下启动peer server,即rtmp  server模块

接着再windows下启动vlc播放器

然后再windows下平台启动ffmpeg进行推流

最后可以看到vlc的播放画面

 从上面例子可以看到,我们可以基于库快科技的p2p库,用p2p( 基于udp协议)传输方式,来传输rtmp协议数据,这样再一对一音视频传输场景下,会为您节约大量的带宽成本。

最后我们看peer server端(rtmp服务端)源码,该源码大部分都是rtmp协议相关,主要来自于互联网,仅做了p2p适配处理,我们可以看到库快科技的p2p库是极易使用的。

编译该源码除了链接库快科技的p2p库之外,还需要链接rtmp的库

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>

#include <librtmp/rtmp.h>
#include <librtmp/log.h>

#include "kkp2p_sdk.h"

kkp2p_engine_t* g_engine = NULL;


class CMutex
{
    pthread_mutex_t m_mutex;

public:
    CMutex()
    {
        pthread_mutex_init(&m_mutex, NULL);
    }
    ~CMutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }
};

template<typename T>
class CAutoLock
{
    T* m_pLock;

public:
    CAutoLock(T* pLock) : m_pLock(pLock) 
    {
        m_pLock->lock();
    }
    ~CAutoLock() 
    { 
        m_pLock->unlock();
    }
};


class CConnection
{
    uint32_t m_uConnID;
    uint32_t m_uNextStreamID;

    RTMP* m_pRtmp;
    std::string m_strApp;
    int m_nStreamType;
    std::set<uint32_t> m_setUsingStreamID;
    std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
    std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;

    std::list<RTMPPacket*> m_listpPacket;
    CMutex m_mutex;
    sem_t m_sem;

public:
    // 流类型
    enum EStreamType
    {
        Unkown = 0,
        Publish,
        Play
    };

    CConnection(uint32_t uConnID, int nSocket)
        : m_uConnID(uConnID)
        , m_uNextStreamID(1)
        , m_pRtmp(NULL)
        , m_strApp("")
        , m_nStreamType(Unkown)
    {
        m_pRtmp = RTMP_Alloc();
        RTMP_Init(m_pRtmp);
        m_pRtmp->m_sb.sb_socket = nSocket;

        sem_init(&m_sem, 0, 0);
    }
    virtual ~CConnection()
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);

        sem_destroy(&m_sem);
    }

    uint32_t ConnID()
    {
        return m_uConnID;
    }

    RTMP* Rtmp()
    {
        return m_pRtmp;
    }
    int Socket()
    {
        return m_pRtmp->m_sb.sb_socket;
    }

    void setAppName(const std::string& strApp)
    {
        m_strApp = strApp;
    }
    const std::string& getAppName() const
    {
        return m_strApp;
    }

    void setStreamType(EStreamType emType)
    {
        m_nStreamType = emType;
    }
    EStreamType getStreamType()
    {
        return (EStreamType)m_nStreamType;
    }

    uint32_t genStreamID()
    {
        uint32_t uStreamID = m_uNextStreamID++;
        m_setUsingStreamID.insert(uStreamID);
        return uStreamID;
    }

    // 检查流ID是否合法
    bool isValidStreamID(uint32_t uStreamID)
    {
        return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
    }

    // 登记 推流ID与playpath 映射关系
    void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 推流ID与playpath 映射关系
    void unbindPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 推流ID映射关系
    std::string getPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPublishStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 推流的playpath列表
    const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 推流ID与playpath 映射关系
    void cleanPublishPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.clear();
    }

    // 登记 拉流ID与playpath 映射关系
    void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 拉流ID与playpath 映射关系
    void unbindPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 拉流ID映射关系
    std::string getPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPlayStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 拉流的playpath列表
    const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 拉流ID与playpath 映射关系
    void cleanPlayPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.clear();
    }

    // 通知指定的playpath即将重置
    void tellResetPlayPath(const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        uint32_t uStreamID = 0;

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            if (iter->second == strPlayPath)
            {
                uStreamID = iter->first;
                m_mapPublishStreamIDPlayPath.erase(iter);
                break;
            }
        }

        if (uStreamID == 0)
        {
            for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
            {
                if (iter->second == strPlayPath)
                {
                    uStreamID = iter->first;
                    m_mapPlayStreamIDPlayPath.erase(iter);
                    break;
                }
            }
        }
    }

    // 提取待发送的报文
    RTMPPacket* popPacket()
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
        struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
        sem_timedwait(&m_sem, &ts);

        CAutoLock<CMutex> lock(&m_mutex);

        if (m_listpPacket.empty())
            return NULL;

        RTMPPacket* pPacket = m_listpPacket.front();
        m_listpPacket.pop_front();

        return pPacket;
    }

    // 向连接拷贝多个报文
    void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
        {
            m_listpPacket.push_back(*iter);
            sem_post(&m_sem);
        }
    }
};

// 客户端连接管理 =>

class CConnections
{
    uint32_t m_uNextConnID;
    std::map<uint32_t, CConnection*> m_mapConnection;

    CMutex m_mutex;

public:
    CConnections()
        : m_uNextConnID(1)
    {
    }
    virtual ~CConnections() {}

    CConnection* createConnection(int nSocket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
        m_mapConnection[pConnection->ConnID()] = pConnection;
        return pConnection;
    }

    void releaseConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConn = __getConnection(uConnID);
        if (pConn)
        {
            m_mapConnection.erase(uConnID);
            delete pConn;
        }
    }

    CConnection* getConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        return __getConnection(uConnID);
    }

private:

    CConnection* __getConnection(uint32_t uConnID)
    {
        auto iter = m_mapConnection.find(uConnID);
        if (iter == m_mapConnection.end())
            return NULL;

        return iter->second;
    }
};

CConnections g_Conns;

// 节目容器 =>

class CPlayPath
{
    std::string m_strPlayPath;
    
    bool m_bEOF;
    uint32_t m_uPublishConnID;
    std::set<uint32_t> m_setPlayConnID;

    CMutex m_mutex;
   
public:
    CPlayPath(const std::string& strPlayPath)
        : m_strPlayPath(strPlayPath)
        , m_uPublishConnID(0)
        , m_bEOF(true)
    {
    }
    virtual ~CPlayPath() {}

    const std::string& getName() const
    {
        return m_strPlayPath;
    }

    // 设置/获取 结束标志
    void setEOF()
    {
        m_bEOF = true;
    }
    bool isEOF()
    {
        return m_bEOF;
    }

    // 重置节目对象
    void reset(bool bCleanPlayer = false)
    {
        // 清除结束标志
        m_bEOF = false;

        uint32_t uPublishConnID = 0;
        std::set<uint32_t> setPlayConnID;

        // 清除推流和拉流连接
        {
            CAutoLock<CMutex> lock(&m_mutex);

            uPublishConnID = m_uPublishConnID;
            m_uPublishConnID = 0;

            if (bCleanPlayer)
            {
                m_setPlayConnID.swap(setPlayConnID);
            }
        }

        // 通知推流连接做清除处理
        if (uPublishConnID > 0)
        {
            CConnection* pConn = g_Conns.getConnection(uPublishConnID);
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }

        // 通知拉流连接做清除处理
        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }
    }

    // 登记推流连接
    void setPublishConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = uConnID;
    }
    // 取消登记推流连接
    bool unsetPublishConn()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = 0;
    }

    // 登记拉流连接
    bool addPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter != m_setPlayConnID.end())
            return false;

        m_setPlayConnID.insert(uConnID);
        return true;
    }
    // 取消登记拉流连接
    bool delPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter == m_setPlayConnID.end())
            return false;

        m_setPlayConnID.erase(uConnID);
        return true;
    }

    // 暂存媒体报文
    void cacheMediaPacket(RTMPPacket* pPacket)
    {
        std::set<uint32_t> setPlayConnID;

        {
            CAutoLock<CMutex> lock(&m_mutex);

            setPlayConnID = m_setPlayConnID;
        }

        // 简单起见,直接拷贝到拉流连接

        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn == NULL)
                continue;

            std::vector<RTMPPacket*> vecpPacket;

            RTMPPacket* pPacketCP = new RTMPPacket;
            RTMPPacket_Reset(pPacketCP);
            memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
            RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
            memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
            pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;

            vecpPacket.push_back(pPacketCP);

            pConn->copyPackets(m_strPlayPath, vecpPacket);
        }
    }
};

// 应用容器 =>
class CApp
{
    std::string m_strApp;
    std::map<std::string, CPlayPath*> m_mappPlayPath;

    CMutex m_mutex;

public:
    CApp(const std::string& strApp) : m_strApp(strApp) {}
    virtual ~CApp() {}

    const std::string& getName() const
    {
        return m_strApp;
    }

    CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mappPlayPath.find(strPlayPath);
        if (iter != m_mappPlayPath.end())
            return iter->second;

        if (bCreate)
        {
            CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
            m_mappPlayPath[strPlayPath] = pPlayPath;

            return pPlayPath;
        }
          
        return NULL;
    }
};

// 应用集合管理 =>

class CApps
{
    std::map<std::string, CApp*> m_mapApp;

    CMutex m_mutex;

public:
    CApps() {}
    virtual ~CApps() {}

    CApp* getApp(const std::string& strApp, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapApp.find(strApp);
        if (iter != m_mapApp.end())
            return iter->second;

        if (bCreate)
        {
            CApp* pApp = new CApp(strApp);
            m_mapApp[strApp] = pApp;

            return pApp; 
        }

        return NULL;
    }
};

CApps g_Apps;

// 程序逻辑 =>

void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);

int main(int argc, char* argv[])
{
    if (argc < 3) {
        printf("usage:%s peer_id peer_key\n", argv[0]);
        return -1;
    }

    RTMP_LogSetLevel(RTMP_LOGDEBUG);

    kkp2p_engine_conf_t kkp2p_conf;
    kkp2p_conf.login_domain = "124.71.217.198";
	kkp2p_conf.login_port = 3080;
	kkp2p_conf.lan_search_port = 3549;
    kkp2p_conf.max_log_size = 1024*1024*10;
    kkp2p_conf.log_path = NULL;
    g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    kkp2p_switch_log_level(g_engine, 4);
   
    kkp2p_join_lan(g_engine, argv[1]);
    kkp2p_join_net(g_engine, argv[1], argv[2]);
    kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
    while(1) {
        int ret = kkp2p_accept(g_engine, 1000, channel);
        if (ret < 0) {
            // error
            printf("kkp2p_accept error,exit\n");
            free(channel);
            break;
        } else if (ret == 0) {
             // timeout
             continue;
        } else {
             // success
             pthread_t ThreadId;
             printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
             pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
             channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
        }
    }
    return 0;
}

void* ClientThread(void* param)
{
    pthread_detach(pthread_self());
    kkp2p_channel_t* channel = (kkp2p_channel_t*)param;

    // use default block
    int val = fcntl(channel->fd, F_GETFL, 0);
    fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
    
    g_Conns.createConnection(channel->fd);
    CConnection* pConn = g_Conns.createConnection(channel->fd);

    printf("connection:[%d] coming... \n", pConn->ConnID());

    // 握手
    bool b = MyHandshake(pConn->Socket());
    if (!b)
    {
        printf("connection:[%d] handshake failed! \n", pConn->ConnID());
        g_Conns.releaseConnection(pConn->ConnID());
        return NULL;
    }

    while (true)
    {
        RTMPPacket packet;
        packet.m_body = NULL;
        packet.m_chunk = NULL;
        RTMPPacket_Reset(&packet);

        // 读取报文
        if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
        {
            printf("connection:[%d] read error! \n", pConn->ConnID());
            break;
        }

       if (!RTMPPacket_IsReady(&packet))
           continue;

       //printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
       //       pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);

       // 报文分派交互
       bool b = Dispatch(pConn, &packet);

       RTMPPacket_Free(&packet);

       if (!b)
       {
           printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
           break;
       }

       if (pConn->getStreamType() == CConnection::Play)
       {
           printf("connection:[%d] now play... \n", pConn->ConnID());
           break;
       }
    }

    // 进入拉流状态
    struct timeval tv;
    gettimeofday(&tv, NULL);
    double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
    while (pConn->getStreamType() == CConnection::Play)
    {
        RTMPPacket* pPacket = pConn->popPacket();

        struct timeval tvNow;
        gettimeofday(&tvNow, NULL);
        double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;

        // 超时检查
        if (pPacket == NULL)
        {
            if (fNowReadTime - fLastReadTime < 30)
                continue;

            printf("connection:[%d] too time no packet \n", pConn->ConnID());
            break;
        }

        fLastReadTime = fNowReadTime;

        // 下发媒体报文
        bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);

        RTMPPacket_Free(pPacket);
        delete pPacket;

        if (!b)
        {
            printf("connection:[%d] send failed! \n", pConn->ConnID());
            break;
        }
    }

    // 连接退出前关系解除
    switch (pConn->getStreamType())
    {
        case CConnection::Publish:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPublishPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->setEOF();
                        pPlayPath->unsetPublishConn();
                    }
                }
                pConn->cleanPublishPlayPath();
            }
            break;
        case CConnection::Play:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPlayPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->delPlayConn(pConn->ConnID());
                    }
                }
                pConn->cleanPlayPlayPath();
            }
            break;
    }

    printf("connection:[%d] exit! \n", pConn->ConnID());

    g_Conns.releaseConnection(pConn->ConnID());
    kkp2p_close_fd(channel->fd);
    kkp2p_close_channel(g_engine, channel->channel_id);
    free(channel);
    return NULL;
}

int send_data(int fd, char* buff, int len) {
    int sended = 0 ;
    while (sended < len) {
        int wl = send(fd, buff + sended, len - sended, 0);
        if (wl < 0) {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

int recv_data(int fd, char* buff, int len) {
    int recved = 0 ;
    while (recved < len) {
        int wl = recv(fd, buff + recved, len - recved, 0);
        if (wl < 0) {
            printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
            return -1;
        }
        recved += wl;
    }
    return len;
}


// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
    char type = 0;
    if (recv_data(nSocket, (char*)&type, 1) != 1) {
        return false;
    }

    if (type != 3) {
        return false;
    }

    char sClientSIG[RTMP_SIG_SIZE] = {0};
    if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
     }

    char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
    sServerSIG[0] = 3;
    
    if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
        return false;
    }

    if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    return true;
}

// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
    switch (pPacket->m_packetType)
    {
        case 0x01:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
                }
            }
            break;

        case 0x04:
            {
            }
            break;

        case 0x05:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
                }
            }
            break;

        case 0x06:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
                }
                if (pPacket->m_nBodySize >= 5)
                {
                    int nOutputBW2 = pPacket->m_body[4];
                    printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
                }
            }
            break;

        case 0x08:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x09:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x12:
            {
            }
            break;

        case 0x14:
            {
                if (HandleInvoke(pConn, pPacket) < 0)
                    return false;
            }
            break;
    }

    return true;
}

#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);

AVal makeAVal(const char* pStr)
{
    return {(char*)pStr, (int)strlen(pStr)};
}

// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) 
{
    if (pPacket->m_body[0] != 0x02)
    {
        printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
        return -1;
    }

    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    AMFObject obj;
    int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
    if (nSize < 0)
    {
        printf("connection:[%d] invalid packet! \n", pConn->ConnID());
        return -1;
    }

    AVal method;
    AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
    int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
    printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);

    if (AVMATCH(&method, &av_connect))
    {
        AMFObject obj1;
        AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);

        AVal appName = makeAVal("app");
        AVal app;
        AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);

        std::string strApp(app.av_val, app.av_len);
        printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());

        pConn->setAppName(strApp);

        if (!sendWindowAckSize(pConn))
            return -1;

        if (!sendPeerOutputBandWide(pConn))
            return -1;

        if (!sendOutputChunkSize(pConn))
            return -1;

        if (!sendConnectResult(pConn, nOperateID))
            return -1;
    }

    else if (AVMATCH(&method, &av_releaseStream))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 检查该节目是否推流结束
        CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
        if (!pPlayPath->isEOF())
        {
            if (!sendPublishError(pConn, nInputStreamID))
                return -1;
            return 0;
        }

        // 重置节目
        pPlayPath->reset(false);
    }
    
    else if (AVMATCH(&method, &av_FCPublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 安全起见,初使化节目
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
    }

    else if (AVMATCH(&method, &av_createStream))
    {
        // 生成流ID
        uint32_t uStreamID = pConn->genStreamID();

        printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);

        if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_publish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Publish);
        pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());

        if (!sendPublishStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_play))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
        int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Play);
        pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());

        if (!sendPlayStreamBegin(pConn, nInputStreamID))
            return -1;

        if (!sendPlayStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_FCUnpublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
    }

    else if (AVMATCH(&method, &av_deleteStream))
    {
        int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
        printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);

        // 连接与节目 解除双向关联

        std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPublishPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
        }

        strPlayPath = pConn->getPlayPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPlayPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
        }
    }

    AMF_Reset(&obj);
    return 0;
}

// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
    g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);

    return 0;
}

// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x05;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x06;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_body[4] = 2;
    packet.m_nBodySize = 5;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
    pConn->Rtmp()->m_outChunkSize = 4096;

    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x01;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 4096);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);

    AMFObject obj1 = {0, NULL};

    AMFObjectProperty fmsVer;
    fmsVer.p_name = makeAVal("fmsVer");
    fmsVer.p_type = AMF_STRING;
    fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
    AMF_AddProp(&obj1, &fmsVer);

    AMFObjectProperty capabilities;
    capabilities.p_name = makeAVal("capabilities");
    capabilities.p_type = AMF_NUMBER;
    capabilities.p_vu.p_number = 31;
    AMF_AddProp(&obj1, &capabilities);

    pEnc = AMF_Encode(&obj1, pEnc, pEnd);

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
    *pEnc++ = AMF_NULL;
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("error");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
    AMF_AddProp(&obj2, &code);

    AMFObjectProperty description;
    description.p_name = makeAVal("description");
    description.p_type = AMF_STRING;
    description.p_vu.p_aval = makeAVal("Already publishing");
    AMF_AddProp(&obj2, &description);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x04;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
    pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}










#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>

#include <librtmp/rtmp.h>
#include <librtmp/log.h>

#include "kkp2p_sdk.h"

kkp2p_engine_t* g_engine = NULL;


class CMutex
{
    pthread_mutex_t m_mutex;

public:
    CMutex()
    {
        pthread_mutex_init(&m_mutex, NULL);
    }
    ~CMutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }
};

template<typename T>
class CAutoLock
{
    T* m_pLock;

public:
    CAutoLock(T* pLock) : m_pLock(pLock) 
    {
        m_pLock->lock();
    }
    ~CAutoLock() 
    { 
        m_pLock->unlock();
    }
};


class CConnection
{
    uint32_t m_uConnID;
    uint32_t m_uNextStreamID;

    RTMP* m_pRtmp;
    std::string m_strApp;
    int m_nStreamType;
    std::set<uint32_t> m_setUsingStreamID;
    std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
    std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;

    std::list<RTMPPacket*> m_listpPacket;
    CMutex m_mutex;
    sem_t m_sem;

public:
    // 流类型
    enum EStreamType
    {
        Unkown = 0,
        Publish,
        Play
    };

    CConnection(uint32_t uConnID, int nSocket)
        : m_uConnID(uConnID)
        , m_uNextStreamID(1)
        , m_pRtmp(NULL)
        , m_strApp("")
        , m_nStreamType(Unkown)
    {
        m_pRtmp = RTMP_Alloc();
        RTMP_Init(m_pRtmp);
        m_pRtmp->m_sb.sb_socket = nSocket;

        sem_init(&m_sem, 0, 0);
    }
    virtual ~CConnection()
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);

        sem_destroy(&m_sem);
    }

    uint32_t ConnID()
    {
        return m_uConnID;
    }

    RTMP* Rtmp()
    {
        return m_pRtmp;
    }
    int Socket()
    {
        return m_pRtmp->m_sb.sb_socket;
    }

    void setAppName(const std::string& strApp)
    {
        m_strApp = strApp;
    }
    const std::string& getAppName() const
    {
        return m_strApp;
    }

    void setStreamType(EStreamType emType)
    {
        m_nStreamType = emType;
    }
    EStreamType getStreamType()
    {
        return (EStreamType)m_nStreamType;
    }

    uint32_t genStreamID()
    {
        uint32_t uStreamID = m_uNextStreamID++;
        m_setUsingStreamID.insert(uStreamID);
        return uStreamID;
    }

    // 检查流ID是否合法
    bool isValidStreamID(uint32_t uStreamID)
    {
        return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
    }

    // 登记 推流ID与playpath 映射关系
    void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 推流ID与playpath 映射关系
    void unbindPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 推流ID映射关系
    std::string getPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPublishStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 推流的playpath列表
    const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 推流ID与playpath 映射关系
    void cleanPublishPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.clear();
    }

    // 登记 拉流ID与playpath 映射关系
    void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 拉流ID与playpath 映射关系
    void unbindPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 拉流ID映射关系
    std::string getPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPlayStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 拉流的playpath列表
    const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 拉流ID与playpath 映射关系
    void cleanPlayPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.clear();
    }

    // 通知指定的playpath即将重置
    void tellResetPlayPath(const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        uint32_t uStreamID = 0;

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            if (iter->second == strPlayPath)
            {
                uStreamID = iter->first;
                m_mapPublishStreamIDPlayPath.erase(iter);
                break;
            }
        }

        if (uStreamID == 0)
        {
            for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
            {
                if (iter->second == strPlayPath)
                {
                    uStreamID = iter->first;
                    m_mapPlayStreamIDPlayPath.erase(iter);
                    break;
                }
            }
        }
    }

    // 提取待发送的报文
    RTMPPacket* popPacket()
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
        struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
        sem_timedwait(&m_sem, &ts);

        CAutoLock<CMutex> lock(&m_mutex);

        if (m_listpPacket.empty())
            return NULL;

        RTMPPacket* pPacket = m_listpPacket.front();
        m_listpPacket.pop_front();

        return pPacket;
    }

    // 向连接拷贝多个报文
    void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
        {
            m_listpPacket.push_back(*iter);
            sem_post(&m_sem);
        }
    }
};

// 客户端连接管理 =>

class CConnections
{
    uint32_t m_uNextConnID;
    std::map<uint32_t, CConnection*> m_mapConnection;

    CMutex m_mutex;

public:
    CConnections()
        : m_uNextConnID(1)
    {
    }
    virtual ~CConnections() {}

    CConnection* createConnection(int nSocket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
        m_mapConnection[pConnection->ConnID()] = pConnection;
        return pConnection;
    }

    void releaseConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConn = __getConnection(uConnID);
        if (pConn)
        {
            m_mapConnection.erase(uConnID);
            delete pConn;
        }
    }

    CConnection* getConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        return __getConnection(uConnID);
    }

private:

    CConnection* __getConnection(uint32_t uConnID)
    {
        auto iter = m_mapConnection.find(uConnID);
        if (iter == m_mapConnection.end())
            return NULL;

        return iter->second;
    }
};

CConnections g_Conns;

// 节目容器 =>

class CPlayPath
{
    std::string m_strPlayPath;
    
    bool m_bEOF;
    uint32_t m_uPublishConnID;
    std::set<uint32_t> m_setPlayConnID;

    CMutex m_mutex;
   
public:
    CPlayPath(const std::string& strPlayPath)
        : m_strPlayPath(strPlayPath)
        , m_uPublishConnID(0)
        , m_bEOF(true)
    {
    }
    virtual ~CPlayPath() {}

    const std::string& getName() const
    {
        return m_strPlayPath;
    }

    // 设置/获取 结束标志
    void setEOF()
    {
        m_bEOF = true;
    }
    bool isEOF()
    {
        return m_bEOF;
    }

    // 重置节目对象
    void reset(bool bCleanPlayer = false)
    {
        // 清除结束标志
        m_bEOF = false;

        uint32_t uPublishConnID = 0;
        std::set<uint32_t> setPlayConnID;

        // 清除推流和拉流连接
        {
            CAutoLock<CMutex> lock(&m_mutex);

            uPublishConnID = m_uPublishConnID;
            m_uPublishConnID = 0;

            if (bCleanPlayer)
            {
                m_setPlayConnID.swap(setPlayConnID);
            }
        }

        // 通知推流连接做清除处理
        if (uPublishConnID > 0)
        {
            CConnection* pConn = g_Conns.getConnection(uPublishConnID);
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }

        // 通知拉流连接做清除处理
        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }
    }

    // 登记推流连接
    void setPublishConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = uConnID;
    }
    // 取消登记推流连接
    bool unsetPublishConn()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = 0;
    }

    // 登记拉流连接
    bool addPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter != m_setPlayConnID.end())
            return false;

        m_setPlayConnID.insert(uConnID);
        return true;
    }
    // 取消登记拉流连接
    bool delPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter == m_setPlayConnID.end())
            return false;

        m_setPlayConnID.erase(uConnID);
        return true;
    }

    // 暂存媒体报文
    void cacheMediaPacket(RTMPPacket* pPacket)
    {
        std::set<uint32_t> setPlayConnID;

        {
            CAutoLock<CMutex> lock(&m_mutex);

            setPlayConnID = m_setPlayConnID;
        }

        // 简单起见,直接拷贝到拉流连接

        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn == NULL)
                continue;

            std::vector<RTMPPacket*> vecpPacket;

            RTMPPacket* pPacketCP = new RTMPPacket;
            RTMPPacket_Reset(pPacketCP);
            memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
            RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
            memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
            pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;

            vecpPacket.push_back(pPacketCP);

            pConn->copyPackets(m_strPlayPath, vecpPacket);
        }
    }
};

// 应用容器 =>
class CApp
{
    std::string m_strApp;
    std::map<std::string, CPlayPath*> m_mappPlayPath;

    CMutex m_mutex;

public:
    CApp(const std::string& strApp) : m_strApp(strApp) {}
    virtual ~CApp() {}

    const std::string& getName() const
    {
        return m_strApp;
    }

    CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mappPlayPath.find(strPlayPath);
        if (iter != m_mappPlayPath.end())
            return iter->second;

        if (bCreate)
        {
            CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
            m_mappPlayPath[strPlayPath] = pPlayPath;

            return pPlayPath;
        }
          
        return NULL;
    }
};

// 应用集合管理 =>

class CApps
{
    std::map<std::string, CApp*> m_mapApp;

    CMutex m_mutex;

public:
    CApps() {}
    virtual ~CApps() {}

    CApp* getApp(const std::string& strApp, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapApp.find(strApp);
        if (iter != m_mapApp.end())
            return iter->second;

        if (bCreate)
        {
            CApp* pApp = new CApp(strApp);
            m_mapApp[strApp] = pApp;

            return pApp; 
        }

        return NULL;
    }
};

CApps g_Apps;

// 程序逻辑 =>

void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);

int main(int argc, char* argv[])
{
    if (argc < 3) {
        printf("usage:%s peer_id peer_key\n", argv[0]);
        return -1;
    }

    RTMP_LogSetLevel(RTMP_LOGDEBUG);

    kkp2p_engine_conf_t kkp2p_conf;
    kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
    kkp2p_conf.max_log_size = 1024*1024*10;
    kkp2p_conf.log_path = NULL;
    g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    kkp2p_switch_log_level(g_engine, 4);
   
    kkp2p_join_lan(g_engine, argv[1]);
    kkp2p_join_net(g_engine, argv[1], argv[2]);
    kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
    while(1) {
        int ret = kkp2p_accept(g_engine, 1000, channel);
        if (ret < 0) {
            // error
            printf("kkp2p_accept error,exit\n");
            free(channel);
            break;
        } else if (ret == 0) {
             // timeout
             continue;
        } else {
             // success
             pthread_t ThreadId;
             printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
             pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
             channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
        }
    }
    return 0;
}

void* ClientThread(void* param)
{
    pthread_detach(pthread_self());
    kkp2p_channel_t* channel = (kkp2p_channel_t*)param;

    // use default block
    int val = fcntl(channel->fd, F_GETFL, 0);
    fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
    
    g_Conns.createConnection(channel->fd);
    CConnection* pConn = g_Conns.createConnection(channel->fd);

    printf("connection:[%d] coming... \n", pConn->ConnID());

    // 握手
    bool b = MyHandshake(pConn->Socket());
    if (!b)
    {
        printf("connection:[%d] handshake failed! \n", pConn->ConnID());
        g_Conns.releaseConnection(pConn->ConnID());
        return NULL;
    }

    while (true)
    {
        RTMPPacket packet;
        packet.m_body = NULL;
        packet.m_chunk = NULL;
        RTMPPacket_Reset(&packet);

        // 读取报文
        if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
        {
            printf("connection:[%d] read error! \n", pConn->ConnID());
            break;
        }

       if (!RTMPPacket_IsReady(&packet))
           continue;

       //printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
       //       pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);

       // 报文分派交互
       bool b = Dispatch(pConn, &packet);

       RTMPPacket_Free(&packet);

       if (!b)
       {
           printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
           break;
       }

       if (pConn->getStreamType() == CConnection::Play)
       {
           printf("connection:[%d] now play... \n", pConn->ConnID());
           break;
       }
    }

    // 进入拉流状态
    struct timeval tv;
    gettimeofday(&tv, NULL);
    double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
    while (pConn->getStreamType() == CConnection::Play)
    {
        RTMPPacket* pPacket = pConn->popPacket();

        struct timeval tvNow;
        gettimeofday(&tvNow, NULL);
        double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;

        // 超时检查
        if (pPacket == NULL)
        {
            if (fNowReadTime - fLastReadTime < 30)
                continue;

            printf("connection:[%d] too time no packet \n", pConn->ConnID());
            break;
        }

        fLastReadTime = fNowReadTime;

        // 下发媒体报文
        bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);

        RTMPPacket_Free(pPacket);
        delete pPacket;

        if (!b)
        {
            printf("connection:[%d] send failed! \n", pConn->ConnID());
            break;
        }
    }

    // 连接退出前关系解除
    switch (pConn->getStreamType())
    {
        case CConnection::Publish:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPublishPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->setEOF();
                        pPlayPath->unsetPublishConn();
                    }
                }
                pConn->cleanPublishPlayPath();
            }
            break;
        case CConnection::Play:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPlayPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->delPlayConn(pConn->ConnID());
                    }
                }
                pConn->cleanPlayPlayPath();
            }
            break;
    }

    printf("connection:[%d] exit! \n", pConn->ConnID());

    g_Conns.releaseConnection(pConn->ConnID());
    kkp2p_close_fd(channel->fd);
    kkp2p_close_channel(g_engine, channel->channel_id);
    free(channel);
    return NULL;
}

int send_data(int fd, char* buff, int len) {
    int sended = 0 ;
    while (sended < len) {
        int wl = send(fd, buff + sended, len - sended, 0);
        if (wl < 0) {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

int recv_data(int fd, char* buff, int len) {
    int recved = 0 ;
    while (recved < len) {
        int wl = recv(fd, buff + recved, len - recved, 0);
        if (wl < 0) {
            printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
            return -1;
        }
        recved += wl;
    }
    return len;
}


// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
    char type = 0;
    if (recv_data(nSocket, (char*)&type, 1) != 1) {
        return false;
    }

    if (type != 3) {
        return false;
    }

    char sClientSIG[RTMP_SIG_SIZE] = {0};
    if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
     }

    char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
    sServerSIG[0] = 3;
    
    if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
        return false;
    }

    if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    return true;
}

// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
    switch (pPacket->m_packetType)
    {
        case 0x01:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
                }
            }
            break;

        case 0x04:
            {
            }
            break;

        case 0x05:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
                }
            }
            break;

        case 0x06:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
                }
                if (pPacket->m_nBodySize >= 5)
                {
                    int nOutputBW2 = pPacket->m_body[4];
                    printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
                }
            }
            break;

        case 0x08:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x09:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x12:
            {
            }
            break;

        case 0x14:
            {
                if (HandleInvoke(pConn, pPacket) < 0)
                    return false;
            }
            break;
    }

    return true;
}

#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);

AVal makeAVal(const char* pStr)
{
    return {(char*)pStr, (int)strlen(pStr)};
}

// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) 
{
    if (pPacket->m_body[0] != 0x02)
    {
        printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
        return -1;
    }

    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    AMFObject obj;
    int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
    if (nSize < 0)
    {
        printf("connection:[%d] invalid packet! \n", pConn->ConnID());
        return -1;
    }

    AVal method;
    AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
    int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
    printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);

    if (AVMATCH(&method, &av_connect))
    {
        AMFObject obj1;
        AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);

        AVal appName = makeAVal("app");
        AVal app;
        AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);

        std::string strApp(app.av_val, app.av_len);
        printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());

        pConn->setAppName(strApp);

        if (!sendWindowAckSize(pConn))
            return -1;

        if (!sendPeerOutputBandWide(pConn))
            return -1;

        if (!sendOutputChunkSize(pConn))
            return -1;

        if (!sendConnectResult(pConn, nOperateID))
            return -1;
    }

    else if (AVMATCH(&method, &av_releaseStream))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 检查该节目是否推流结束
        CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
        if (!pPlayPath->isEOF())
        {
            if (!sendPublishError(pConn, nInputStreamID))
                return -1;
            return 0;
        }

        // 重置节目
        pPlayPath->reset(false);
    }
    
    else if (AVMATCH(&method, &av_FCPublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 安全起见,初使化节目
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
    }

    else if (AVMATCH(&method, &av_createStream))
    {
        // 生成流ID
        uint32_t uStreamID = pConn->genStreamID();

        printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);

        if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_publish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Publish);
        pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());

        if (!sendPublishStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_play))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
        int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Play);
        pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());

        if (!sendPlayStreamBegin(pConn, nInputStreamID))
            return -1;

        if (!sendPlayStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_FCUnpublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
    }

    else if (AVMATCH(&method, &av_deleteStream))
    {
        int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
        printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);

        // 连接与节目 解除双向关联

        std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPublishPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
        }

        strPlayPath = pConn->getPlayPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPlayPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
        }
    }

    AMF_Reset(&obj);
    return 0;
}

// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
    g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);

    return 0;
}

// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x05;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x06;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_body[4] = 2;
    packet.m_nBodySize = 5;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
    pConn->Rtmp()->m_outChunkSize = 4096;

    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x01;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 4096);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);

    AMFObject obj1 = {0, NULL};

    AMFObjectProperty fmsVer;
    fmsVer.p_name = makeAVal("fmsVer");
    fmsVer.p_type = AMF_STRING;
    fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
    AMF_AddProp(&obj1, &fmsVer);

    AMFObjectProperty capabilities;
    capabilities.p_name = makeAVal("capabilities");
    capabilities.p_type = AMF_NUMBER;
    capabilities.p_vu.p_number = 31;
    AMF_AddProp(&obj1, &capabilities);

    pEnc = AMF_Encode(&obj1, pEnc, pEnd);

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
    *pEnc++ = AMF_NULL;
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("error");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
    AMF_AddProp(&obj2, &code);

    AMFObjectProperty description;
    description.p_name = makeAVal("description");
    description.p_type = AMF_STRING;
    description.p_vu.p_aval = makeAVal("Already publishing");
    AMF_AddProp(&obj2, &description);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x04;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
    pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>

#include <librtmp/rtmp.h>
#include <librtmp/log.h>

#include "kkp2p_sdk.h"

kkp2p_engine_t* g_engine = NULL;


class CMutex
{
    pthread_mutex_t m_mutex;

public:
    CMutex()
    {
        pthread_mutex_init(&m_mutex, NULL);
    }
    ~CMutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }
};

template<typename T>
class CAutoLock
{
    T* m_pLock;

public:
    CAutoLock(T* pLock) : m_pLock(pLock) 
    {
        m_pLock->lock();
    }
    ~CAutoLock() 
    { 
        m_pLock->unlock();
    }
};


class CConnection
{
    uint32_t m_uConnID;
    uint32_t m_uNextStreamID;

    RTMP* m_pRtmp;
    std::string m_strApp;
    int m_nStreamType;
    std::set<uint32_t> m_setUsingStreamID;
    std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
    std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;

    std::list<RTMPPacket*> m_listpPacket;
    CMutex m_mutex;
    sem_t m_sem;

public:
    // 流类型
    enum EStreamType
    {
        Unkown = 0,
        Publish,
        Play
    };

    CConnection(uint32_t uConnID, int nSocket)
        : m_uConnID(uConnID)
        , m_uNextStreamID(1)
        , m_pRtmp(NULL)
        , m_strApp("")
        , m_nStreamType(Unkown)
    {
        m_pRtmp = RTMP_Alloc();
        RTMP_Init(m_pRtmp);
        m_pRtmp->m_sb.sb_socket = nSocket;

        sem_init(&m_sem, 0, 0);
    }
    virtual ~CConnection()
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);

        sem_destroy(&m_sem);
    }

    uint32_t ConnID()
    {
        return m_uConnID;
    }

    RTMP* Rtmp()
    {
        return m_pRtmp;
    }
    int Socket()
    {
        return m_pRtmp->m_sb.sb_socket;
    }

    void setAppName(const std::string& strApp)
    {
        m_strApp = strApp;
    }
    const std::string& getAppName() const
    {
        return m_strApp;
    }

    void setStreamType(EStreamType emType)
    {
        m_nStreamType = emType;
    }
    EStreamType getStreamType()
    {
        return (EStreamType)m_nStreamType;
    }

    uint32_t genStreamID()
    {
        uint32_t uStreamID = m_uNextStreamID++;
        m_setUsingStreamID.insert(uStreamID);
        return uStreamID;
    }

    // 检查流ID是否合法
    bool isValidStreamID(uint32_t uStreamID)
    {
        return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
    }

    // 登记 推流ID与playpath 映射关系
    void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 推流ID与playpath 映射关系
    void unbindPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 推流ID映射关系
    std::string getPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPublishStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 推流的playpath列表
    const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 推流ID与playpath 映射关系
    void cleanPublishPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.clear();
    }

    // 登记 拉流ID与playpath 映射关系
    void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 拉流ID与playpath 映射关系
    void unbindPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 拉流ID映射关系
    std::string getPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPlayStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 拉流的playpath列表
    const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 拉流ID与playpath 映射关系
    void cleanPlayPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.clear();
    }

    // 通知指定的playpath即将重置
    void tellResetPlayPath(const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        uint32_t uStreamID = 0;

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            if (iter->second == strPlayPath)
            {
                uStreamID = iter->first;
                m_mapPublishStreamIDPlayPath.erase(iter);
                break;
            }
        }

        if (uStreamID == 0)
        {
            for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
            {
                if (iter->second == strPlayPath)
                {
                    uStreamID = iter->first;
                    m_mapPlayStreamIDPlayPath.erase(iter);
                    break;
                }
            }
        }
    }

    // 提取待发送的报文
    RTMPPacket* popPacket()
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
        struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
        sem_timedwait(&m_sem, &ts);

        CAutoLock<CMutex> lock(&m_mutex);

        if (m_listpPacket.empty())
            return NULL;

        RTMPPacket* pPacket = m_listpPacket.front();
        m_listpPacket.pop_front();

        return pPacket;
    }

    // 向连接拷贝多个报文
    void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
        {
            m_listpPacket.push_back(*iter);
            sem_post(&m_sem);
        }
    }
};

// 客户端连接管理 =>

class CConnections
{
    uint32_t m_uNextConnID;
    std::map<uint32_t, CConnection*> m_mapConnection;

    CMutex m_mutex;

public:
    CConnections()
        : m_uNextConnID(1)
    {
    }
    virtual ~CConnections() {}

    CConnection* createConnection(int nSocket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
        m_mapConnection[pConnection->ConnID()] = pConnection;
        return pConnection;
    }

    void releaseConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConn = __getConnection(uConnID);
        if (pConn)
        {
            m_mapConnection.erase(uConnID);
            delete pConn;
        }
    }

    CConnection* getConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        return __getConnection(uConnID);
    }

private:

    CConnection* __getConnection(uint32_t uConnID)
    {
        auto iter = m_mapConnection.find(uConnID);
        if (iter == m_mapConnection.end())
            return NULL;

        return iter->second;
    }
};

CConnections g_Conns;

// 节目容器 =>

class CPlayPath
{
    std::string m_strPlayPath;
    
    bool m_bEOF;
    uint32_t m_uPublishConnID;
    std::set<uint32_t> m_setPlayConnID;

    CMutex m_mutex;
   
public:
    CPlayPath(const std::string& strPlayPath)
        : m_strPlayPath(strPlayPath)
        , m_uPublishConnID(0)
        , m_bEOF(true)
    {
    }
    virtual ~CPlayPath() {}

    const std::string& getName() const
    {
        return m_strPlayPath;
    }

    // 设置/获取 结束标志
    void setEOF()
    {
        m_bEOF = true;
    }
    bool isEOF()
    {
        return m_bEOF;
    }

    // 重置节目对象
    void reset(bool bCleanPlayer = false)
    {
        // 清除结束标志
        m_bEOF = false;

        uint32_t uPublishConnID = 0;
        std::set<uint32_t> setPlayConnID;

        // 清除推流和拉流连接
        {
            CAutoLock<CMutex> lock(&m_mutex);

            uPublishConnID = m_uPublishConnID;
            m_uPublishConnID = 0;

            if (bCleanPlayer)
            {
                m_setPlayConnID.swap(setPlayConnID);
            }
        }

        // 通知推流连接做清除处理
        if (uPublishConnID > 0)
        {
            CConnection* pConn = g_Conns.getConnection(uPublishConnID);
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }

        // 通知拉流连接做清除处理
        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }
    }

    // 登记推流连接
    void setPublishConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = uConnID;
    }
    // 取消登记推流连接
    bool unsetPublishConn()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = 0;
    }

    // 登记拉流连接
    bool addPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter != m_setPlayConnID.end())
            return false;

        m_setPlayConnID.insert(uConnID);
        return true;
    }
    // 取消登记拉流连接
    bool delPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter == m_setPlayConnID.end())
            return false;

        m_setPlayConnID.erase(uConnID);
        return true;
    }

    // 暂存媒体报文
    void cacheMediaPacket(RTMPPacket* pPacket)
    {
        std::set<uint32_t> setPlayConnID;

        {
            CAutoLock<CMutex> lock(&m_mutex);

            setPlayConnID = m_setPlayConnID;
        }

        // 简单起见,直接拷贝到拉流连接

        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn == NULL)
                continue;

            std::vector<RTMPPacket*> vecpPacket;

            RTMPPacket* pPacketCP = new RTMPPacket;
            RTMPPacket_Reset(pPacketCP);
            memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
            RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
            memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
            pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;

            vecpPacket.push_back(pPacketCP);

            pConn->copyPackets(m_strPlayPath, vecpPacket);
        }
    }
};

// 应用容器 =>
class CApp
{
    std::string m_strApp;
    std::map<std::string, CPlayPath*> m_mappPlayPath;

    CMutex m_mutex;

public:
    CApp(const std::string& strApp) : m_strApp(strApp) {}
    virtual ~CApp() {}

    const std::string& getName() const
    {
        return m_strApp;
    }

    CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mappPlayPath.find(strPlayPath);
        if (iter != m_mappPlayPath.end())
            return iter->second;

        if (bCreate)
        {
            CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
            m_mappPlayPath[strPlayPath] = pPlayPath;

            return pPlayPath;
        }
          
        return NULL;
    }
};

// 应用集合管理 =>

class CApps
{
    std::map<std::string, CApp*> m_mapApp;

    CMutex m_mutex;

public:
    CApps() {}
    virtual ~CApps() {}

    CApp* getApp(const std::string& strApp, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapApp.find(strApp);
        if (iter != m_mapApp.end())
            return iter->second;

        if (bCreate)
        {
            CApp* pApp = new CApp(strApp);
            m_mapApp[strApp] = pApp;

            return pApp; 
        }

        return NULL;
    }
};

CApps g_Apps;

// 程序逻辑 =>

void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);

int main(int argc, char* argv[])
{
    if (argc < 3) {
        printf("usage:%s peer_id peer_key\n", argv[0]);
        return -1;
    }

    RTMP_LogSetLevel(RTMP_LOGDEBUG);

    kkp2p_engine_conf_t kkp2p_conf;
    kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
    kkp2p_conf.max_log_size = 1024*1024*10;
    kkp2p_conf.log_path = NULL;
    g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    kkp2p_switch_log_level(g_engine, 4);
   
    kkp2p_join_lan(g_engine, argv[1]);
    kkp2p_join_net(g_engine, argv[1], argv[2]);
    kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
    while(1) {
        int ret = kkp2p_accept(g_engine, 1000, channel);
        if (ret < 0) {
            // error
            printf("kkp2p_accept error,exit\n");
            free(channel);
            break;
        } else if (ret == 0) {
             // timeout
             continue;
        } else {
             // success
             pthread_t ThreadId;
             printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
             pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
             channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
        }
    }
    return 0;
}

void* ClientThread(void* param)
{
    pthread_detach(pthread_self());
    kkp2p_channel_t* channel = (kkp2p_channel_t*)param;

    // use default block
    int val = fcntl(channel->fd, F_GETFL, 0);
    fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
    
    g_Conns.createConnection(channel->fd);
    CConnection* pConn = g_Conns.createConnection(channel->fd);

    printf("connection:[%d] coming... \n", pConn->ConnID());

    // 握手
    bool b = MyHandshake(pConn->Socket());
    if (!b)
    {
        printf("connection:[%d] handshake failed! \n", pConn->ConnID());
        g_Conns.releaseConnection(pConn->ConnID());
        return NULL;
    }

    while (true)
    {
        RTMPPacket packet;
        packet.m_body = NULL;
        packet.m_chunk = NULL;
        RTMPPacket_Reset(&packet);

        // 读取报文
        if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
        {
            printf("connection:[%d] read error! \n", pConn->ConnID());
            break;
        }

       if (!RTMPPacket_IsReady(&packet))
           continue;

       //printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
       //       pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);

       // 报文分派交互
       bool b = Dispatch(pConn, &packet);

       RTMPPacket_Free(&packet);

       if (!b)
       {
           printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
           break;
       }

       if (pConn->getStreamType() == CConnection::Play)
       {
           printf("connection:[%d] now play... \n", pConn->ConnID());
           break;
       }
    }

    // 进入拉流状态
    struct timeval tv;
    gettimeofday(&tv, NULL);
    double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
    while (pConn->getStreamType() == CConnection::Play)
    {
        RTMPPacket* pPacket = pConn->popPacket();

        struct timeval tvNow;
        gettimeofday(&tvNow, NULL);
        double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;

        // 超时检查
        if (pPacket == NULL)
        {
            if (fNowReadTime - fLastReadTime < 30)
                continue;

            printf("connection:[%d] too time no packet \n", pConn->ConnID());
            break;
        }

        fLastReadTime = fNowReadTime;

        // 下发媒体报文
        bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);

        RTMPPacket_Free(pPacket);
        delete pPacket;

        if (!b)
        {
            printf("connection:[%d] send failed! \n", pConn->ConnID());
            break;
        }
    }

    // 连接退出前关系解除
    switch (pConn->getStreamType())
    {
        case CConnection::Publish:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPublishPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->setEOF();
                        pPlayPath->unsetPublishConn();
                    }
                }
                pConn->cleanPublishPlayPath();
            }
            break;
        case CConnection::Play:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPlayPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->delPlayConn(pConn->ConnID());
                    }
                }
                pConn->cleanPlayPlayPath();
            }
            break;
    }

    printf("connection:[%d] exit! \n", pConn->ConnID());

    g_Conns.releaseConnection(pConn->ConnID());
    kkp2p_close_fd(channel->fd);
    kkp2p_close_channel(g_engine, channel->channel_id);
    free(channel);
    return NULL;
}

int send_data(int fd, char* buff, int len) {
    int sended = 0 ;
    while (sended < len) {
        int wl = send(fd, buff + sended, len - sended, 0);
        if (wl < 0) {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

int recv_data(int fd, char* buff, int len) {
    int recved = 0 ;
    while (recved < len) {
        int wl = recv(fd, buff + recved, len - recved, 0);
        if (wl < 0) {
            printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
            return -1;
        }
        recved += wl;
    }
    return len;
}


// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
    char type = 0;
    if (recv_data(nSocket, (char*)&type, 1) != 1) {
        return false;
    }

    if (type != 3) {
        return false;
    }

    char sClientSIG[RTMP_SIG_SIZE] = {0};
    if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
     }

    char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
    sServerSIG[0] = 3;
    
    if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
        return false;
    }

    if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    return true;
}

// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
    switch (pPacket->m_packetType)
    {
        case 0x01:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
                }
            }
            break;

        case 0x04:
            {
            }
            break;

        case 0x05:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
                }
            }
            break;

        case 0x06:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
                }
                if (pPacket->m_nBodySize >= 5)
                {
                    int nOutputBW2 = pPacket->m_body[4];
                    printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
                }
            }
            break;

        case 0x08:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x09:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x12:
            {
            }
            break;

        case 0x14:
            {
                if (HandleInvoke(pConn, pPacket) < 0)
                    return false;
            }
            break;
    }

    return true;
}

#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);

AVal makeAVal(const char* pStr)
{
    return {(char*)pStr, (int)strlen(pStr)};
}

// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) 
{
    if (pPacket->m_body[0] != 0x02)
    {
        printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
        return -1;
    }

    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    AMFObject obj;
    int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
    if (nSize < 0)
    {
        printf("connection:[%d] invalid packet! \n", pConn->ConnID());
        return -1;
    }

    AVal method;
    AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
    int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
    printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);

    if (AVMATCH(&method, &av_connect))
    {
        AMFObject obj1;
        AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);

        AVal appName = makeAVal("app");
        AVal app;
        AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);

        std::string strApp(app.av_val, app.av_len);
        printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());

        pConn->setAppName(strApp);

        if (!sendWindowAckSize(pConn))
            return -1;

        if (!sendPeerOutputBandWide(pConn))
            return -1;

        if (!sendOutputChunkSize(pConn))
            return -1;

        if (!sendConnectResult(pConn, nOperateID))
            return -1;
    }

    else if (AVMATCH(&method, &av_releaseStream))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 检查该节目是否推流结束
        CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
        if (!pPlayPath->isEOF())
        {
            if (!sendPublishError(pConn, nInputStreamID))
                return -1;
            return 0;
        }

        // 重置节目
        pPlayPath->reset(false);
    }
    
    else if (AVMATCH(&method, &av_FCPublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 安全起见,初使化节目
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
    }

    else if (AVMATCH(&method, &av_createStream))
    {
        // 生成流ID
        uint32_t uStreamID = pConn->genStreamID();

        printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);

        if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_publish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Publish);
        pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());

        if (!sendPublishStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_play))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
        int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Play);
        pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());

        if (!sendPlayStreamBegin(pConn, nInputStreamID))
            return -1;

        if (!sendPlayStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_FCUnpublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
    }

    else if (AVMATCH(&method, &av_deleteStream))
    {
        int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
        printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);

        // 连接与节目 解除双向关联

        std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPublishPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
        }

        strPlayPath = pConn->getPlayPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPlayPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
        }
    }

    AMF_Reset(&obj);
    return 0;
}

// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
    g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);

    return 0;
}

// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x05;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x06;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_body[4] = 2;
    packet.m_nBodySize = 5;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
    pConn->Rtmp()->m_outChunkSize = 4096;

    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x01;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 4096);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);

    AMFObject obj1 = {0, NULL};

    AMFObjectProperty fmsVer;
    fmsVer.p_name = makeAVal("fmsVer");
    fmsVer.p_type = AMF_STRING;
    fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
    AMF_AddProp(&obj1, &fmsVer);

    AMFObjectProperty capabilities;
    capabilities.p_name = makeAVal("capabilities");
    capabilities.p_type = AMF_NUMBER;
    capabilities.p_vu.p_number = 31;
    AMF_AddProp(&obj1, &capabilities);

    pEnc = AMF_Encode(&obj1, pEnc, pEnd);

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
    *pEnc++ = AMF_NULL;
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("error");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
    AMF_AddProp(&obj2, &code);

    AMFObjectProperty description;
    description.p_name = makeAVal("description");
    description.p_type = AMF_STRING;
    description.p_vu.p_aval = makeAVal("Already publishing");
    AMF_AddProp(&obj2, &description);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x04;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
    pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>

#include <librtmp/rtmp.h>
#include <librtmp/log.h>

#include "kkp2p_sdk.h"

kkp2p_engine_t* g_engine = NULL;


class CMutex
{
    pthread_mutex_t m_mutex;

public:
    CMutex()
    {
        pthread_mutex_init(&m_mutex, NULL);
    }
    ~CMutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }
};

template<typename T>
class CAutoLock
{
    T* m_pLock;

public:
    CAutoLock(T* pLock) : m_pLock(pLock) 
    {
        m_pLock->lock();
    }
    ~CAutoLock() 
    { 
        m_pLock->unlock();
    }
};


class CConnection
{
    uint32_t m_uConnID;
    uint32_t m_uNextStreamID;

    RTMP* m_pRtmp;
    std::string m_strApp;
    int m_nStreamType;
    std::set<uint32_t> m_setUsingStreamID;
    std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
    std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;

    std::list<RTMPPacket*> m_listpPacket;
    CMutex m_mutex;
    sem_t m_sem;

public:
    // 流类型
    enum EStreamType
    {
        Unkown = 0,
        Publish,
        Play
    };

    CConnection(uint32_t uConnID, int nSocket)
        : m_uConnID(uConnID)
        , m_uNextStreamID(1)
        , m_pRtmp(NULL)
        , m_strApp("")
        , m_nStreamType(Unkown)
    {
        m_pRtmp = RTMP_Alloc();
        RTMP_Init(m_pRtmp);
        m_pRtmp->m_sb.sb_socket = nSocket;

        sem_init(&m_sem, 0, 0);
    }
    virtual ~CConnection()
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);

        sem_destroy(&m_sem);
    }

    uint32_t ConnID()
    {
        return m_uConnID;
    }

    RTMP* Rtmp()
    {
        return m_pRtmp;
    }
    int Socket()
    {
        return m_pRtmp->m_sb.sb_socket;
    }

    void setAppName(const std::string& strApp)
    {
        m_strApp = strApp;
    }
    const std::string& getAppName() const
    {
        return m_strApp;
    }

    void setStreamType(EStreamType emType)
    {
        m_nStreamType = emType;
    }
    EStreamType getStreamType()
    {
        return (EStreamType)m_nStreamType;
    }

    uint32_t genStreamID()
    {
        uint32_t uStreamID = m_uNextStreamID++;
        m_setUsingStreamID.insert(uStreamID);
        return uStreamID;
    }

    // 检查流ID是否合法
    bool isValidStreamID(uint32_t uStreamID)
    {
        return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
    }

    // 登记 推流ID与playpath 映射关系
    void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 推流ID与playpath 映射关系
    void unbindPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 推流ID映射关系
    std::string getPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPublishStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 推流的playpath列表
    const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 推流ID与playpath 映射关系
    void cleanPublishPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.clear();
    }

    // 登记 拉流ID与playpath 映射关系
    void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 拉流ID与playpath 映射关系
    void unbindPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 拉流ID映射关系
    std::string getPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPlayStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 拉流的playpath列表
    const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 拉流ID与playpath 映射关系
    void cleanPlayPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.clear();
    }

    // 通知指定的playpath即将重置
    void tellResetPlayPath(const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        uint32_t uStreamID = 0;

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            if (iter->second == strPlayPath)
            {
                uStreamID = iter->first;
                m_mapPublishStreamIDPlayPath.erase(iter);
                break;
            }
        }

        if (uStreamID == 0)
        {
            for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
            {
                if (iter->second == strPlayPath)
                {
                    uStreamID = iter->first;
                    m_mapPlayStreamIDPlayPath.erase(iter);
                    break;
                }
            }
        }
    }

    // 提取待发送的报文
    RTMPPacket* popPacket()
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
        struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
        sem_timedwait(&m_sem, &ts);

        CAutoLock<CMutex> lock(&m_mutex);

        if (m_listpPacket.empty())
            return NULL;

        RTMPPacket* pPacket = m_listpPacket.front();
        m_listpPacket.pop_front();

        return pPacket;
    }

    // 向连接拷贝多个报文
    void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
        {
            m_listpPacket.push_back(*iter);
            sem_post(&m_sem);
        }
    }
};

// 客户端连接管理 =>

class CConnections
{
    uint32_t m_uNextConnID;
    std::map<uint32_t, CConnection*> m_mapConnection;

    CMutex m_mutex;

public:
    CConnections()
        : m_uNextConnID(1)
    {
    }
    virtual ~CConnections() {}

    CConnection* createConnection(int nSocket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
        m_mapConnection[pConnection->ConnID()] = pConnection;
        return pConnection;
    }

    void releaseConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConn = __getConnection(uConnID);
        if (pConn)
        {
            m_mapConnection.erase(uConnID);
            delete pConn;
        }
    }

    CConnection* getConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        return __getConnection(uConnID);
    }

private:

    CConnection* __getConnection(uint32_t uConnID)
    {
        auto iter = m_mapConnection.find(uConnID);
        if (iter == m_mapConnection.end())
            return NULL;

        return iter->second;
    }
};

CConnections g_Conns;

// 节目容器 =>

class CPlayPath
{
    std::string m_strPlayPath;
    
    bool m_bEOF;
    uint32_t m_uPublishConnID;
    std::set<uint32_t> m_setPlayConnID;

    CMutex m_mutex;
   
public:
    CPlayPath(const std::string& strPlayPath)
        : m_strPlayPath(strPlayPath)
        , m_uPublishConnID(0)
        , m_bEOF(true)
    {
    }
    virtual ~CPlayPath() {}

    const std::string& getName() const
    {
        return m_strPlayPath;
    }

    // 设置/获取 结束标志
    void setEOF()
    {
        m_bEOF = true;
    }
    bool isEOF()
    {
        return m_bEOF;
    }

    // 重置节目对象
    void reset(bool bCleanPlayer = false)
    {
        // 清除结束标志
        m_bEOF = false;

        uint32_t uPublishConnID = 0;
        std::set<uint32_t> setPlayConnID;

        // 清除推流和拉流连接
        {
            CAutoLock<CMutex> lock(&m_mutex);

            uPublishConnID = m_uPublishConnID;
            m_uPublishConnID = 0;

            if (bCleanPlayer)
            {
                m_setPlayConnID.swap(setPlayConnID);
            }
        }

        // 通知推流连接做清除处理
        if (uPublishConnID > 0)
        {
            CConnection* pConn = g_Conns.getConnection(uPublishConnID);
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }

        // 通知拉流连接做清除处理
        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }
    }

    // 登记推流连接
    void setPublishConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = uConnID;
    }
    // 取消登记推流连接
    bool unsetPublishConn()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = 0;
    }

    // 登记拉流连接
    bool addPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter != m_setPlayConnID.end())
            return false;

        m_setPlayConnID.insert(uConnID);
        return true;
    }
    // 取消登记拉流连接
    bool delPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter == m_setPlayConnID.end())
            return false;

        m_setPlayConnID.erase(uConnID);
        return true;
    }

    // 暂存媒体报文
    void cacheMediaPacket(RTMPPacket* pPacket)
    {
        std::set<uint32_t> setPlayConnID;

        {
            CAutoLock<CMutex> lock(&m_mutex);

            setPlayConnID = m_setPlayConnID;
        }

        // 简单起见,直接拷贝到拉流连接

        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn == NULL)
                continue;

            std::vector<RTMPPacket*> vecpPacket;

            RTMPPacket* pPacketCP = new RTMPPacket;
            RTMPPacket_Reset(pPacketCP);
            memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
            RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
            memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
            pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;

            vecpPacket.push_back(pPacketCP);

            pConn->copyPackets(m_strPlayPath, vecpPacket);
        }
    }
};

// 应用容器 =>
class CApp
{
    std::string m_strApp;
    std::map<std::string, CPlayPath*> m_mappPlayPath;

    CMutex m_mutex;

public:
    CApp(const std::string& strApp) : m_strApp(strApp) {}
    virtual ~CApp() {}

    const std::string& getName() const
    {
        return m_strApp;
    }

    CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mappPlayPath.find(strPlayPath);
        if (iter != m_mappPlayPath.end())
            return iter->second;

        if (bCreate)
        {
            CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
            m_mappPlayPath[strPlayPath] = pPlayPath;

            return pPlayPath;
        }
          
        return NULL;
    }
};

// 应用集合管理 =>

class CApps
{
    std::map<std::string, CApp*> m_mapApp;

    CMutex m_mutex;

public:
    CApps() {}
    virtual ~CApps() {}

    CApp* getApp(const std::string& strApp, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapApp.find(strApp);
        if (iter != m_mapApp.end())
            return iter->second;

        if (bCreate)
        {
            CApp* pApp = new CApp(strApp);
            m_mapApp[strApp] = pApp;

            return pApp; 
        }

        return NULL;
    }
};

CApps g_Apps;

// 程序逻辑 =>

void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);

int main(int argc, char* argv[])
{
    if (argc < 3) {
        printf("usage:%s peer_id peer_key\n", argv[0]);
        return -1;
    }

    RTMP_LogSetLevel(RTMP_LOGDEBUG);

    kkp2p_engine_conf_t kkp2p_conf;
    kkp2p_conf.login_domain = "124.71.217.198";
kkp2p_conf.login_port = 3080;
kkp2p_conf.lan_search_port = 3549;
    kkp2p_conf.max_log_size = 1024*1024*10;
    kkp2p_conf.log_path = NULL;
    g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);
    kkp2p_switch_log_level(g_engine, 4);
   
    kkp2p_join_lan(g_engine, argv[1]);
    kkp2p_join_net(g_engine, argv[1], argv[2]);
    kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
    while(1) {
        int ret = kkp2p_accept(g_engine, 1000, channel);
        if (ret < 0) {
            // error
            printf("kkp2p_accept error,exit\n");
            free(channel);
            break;
        } else if (ret == 0) {
             // timeout
             continue;
        } else {
             // success
             pthread_t ThreadId;
             printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);
             pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);
             channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));
        }
    }
    return 0;
}

void* ClientThread(void* param)
{
    pthread_detach(pthread_self());
    kkp2p_channel_t* channel = (kkp2p_channel_t*)param;

    // use default block
    int val = fcntl(channel->fd, F_GETFL, 0);
    fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));
    
    g_Conns.createConnection(channel->fd);
    CConnection* pConn = g_Conns.createConnection(channel->fd);

    printf("connection:[%d] coming... \n", pConn->ConnID());

    // 握手
    bool b = MyHandshake(pConn->Socket());
    if (!b)
    {
        printf("connection:[%d] handshake failed! \n", pConn->ConnID());
        g_Conns.releaseConnection(pConn->ConnID());
        return NULL;
    }

    while (true)
    {
        RTMPPacket packet;
        packet.m_body = NULL;
        packet.m_chunk = NULL;
        RTMPPacket_Reset(&packet);

        // 读取报文
        if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
        {
            printf("connection:[%d] read error! \n", pConn->ConnID());
            break;
        }

       if (!RTMPPacket_IsReady(&packet))
           continue;

       //printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
       //       pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);

       // 报文分派交互
       bool b = Dispatch(pConn, &packet);

       RTMPPacket_Free(&packet);

       if (!b)
       {
           printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
           break;
       }

       if (pConn->getStreamType() == CConnection::Play)
       {
           printf("connection:[%d] now play... \n", pConn->ConnID());
           break;
       }
    }

    // 进入拉流状态
    struct timeval tv;
    gettimeofday(&tv, NULL);
    double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
    while (pConn->getStreamType() == CConnection::Play)
    {
        RTMPPacket* pPacket = pConn->popPacket();

        struct timeval tvNow;
        gettimeofday(&tvNow, NULL);
        double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;

        // 超时检查
        if (pPacket == NULL)
        {
            if (fNowReadTime - fLastReadTime < 30)
                continue;

            printf("connection:[%d] too time no packet \n", pConn->ConnID());
            break;
        }

        fLastReadTime = fNowReadTime;

        // 下发媒体报文
        bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);

        RTMPPacket_Free(pPacket);
        delete pPacket;

        if (!b)
        {
            printf("connection:[%d] send failed! \n", pConn->ConnID());
            break;
        }
    }

    // 连接退出前关系解除
    switch (pConn->getStreamType())
    {
        case CConnection::Publish:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPublishPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->setEOF();
                        pPlayPath->unsetPublishConn();
                    }
                }
                pConn->cleanPublishPlayPath();
            }
            break;
        case CConnection::Play:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPlayPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->delPlayConn(pConn->ConnID());
                    }
                }
                pConn->cleanPlayPlayPath();
            }
            break;
    }

    printf("connection:[%d] exit! \n", pConn->ConnID());

    g_Conns.releaseConnection(pConn->ConnID());
    kkp2p_close_fd(channel->fd);
    kkp2p_close_channel(g_engine, channel->channel_id);
    free(channel);
    return NULL;
}

int send_data(int fd, char* buff, int len) {
    int sended = 0 ;
    while (sended < len) {
        int wl = send(fd, buff + sended, len - sended, 0);
        if (wl < 0) {
            printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));
            return -1;
        }
        sended += wl;
    }
    return len;
}

int recv_data(int fd, char* buff, int len) {
    int recved = 0 ;
    while (recved < len) {
        int wl = recv(fd, buff + recved, len - recved, 0);
        if (wl < 0) {
            printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));
            return -1;
        }
        recved += wl;
    }
    return len;
}


// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
    char type = 0;
    if (recv_data(nSocket, (char*)&type, 1) != 1) {
        return false;
    }

    if (type != 3) {
        return false;
    }

    char sClientSIG[RTMP_SIG_SIZE] = {0};
    if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
     }

    char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
    sServerSIG[0] = 3;
    
    if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {
        return false;
    }

    if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {
        return false;
    }

    return true;
}

// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
    switch (pPacket->m_packetType)
    {
        case 0x01:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
                }
            }
            break;

        case 0x04:
            {
            }
            break;

        case 0x05:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
                }
            }
            break;

        case 0x06:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
                }
                if (pPacket->m_nBodySize >= 5)
                {
                    int nOutputBW2 = pPacket->m_body[4];
                    printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
                }
            }
            break;

        case 0x08:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x09:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x12:
            {
            }
            break;

        case 0x14:
            {
                if (HandleInvoke(pConn, pPacket) < 0)
                    return false;
            }
            break;
    }

    return true;
}

#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);

AVal makeAVal(const char* pStr)
{
    return {(char*)pStr, (int)strlen(pStr)};
}

// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) 
{
    if (pPacket->m_body[0] != 0x02)
    {
        printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
        return -1;
    }

    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    AMFObject obj;
    int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
    if (nSize < 0)
    {
        printf("connection:[%d] invalid packet! \n", pConn->ConnID());
        return -1;
    }

    AVal method;
    AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
    int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
    printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);

    if (AVMATCH(&method, &av_connect))
    {
        AMFObject obj1;
        AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);

        AVal appName = makeAVal("app");
        AVal app;
        AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);

        std::string strApp(app.av_val, app.av_len);
        printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());

        pConn->setAppName(strApp);

        if (!sendWindowAckSize(pConn))
            return -1;

        if (!sendPeerOutputBandWide(pConn))
            return -1;

        if (!sendOutputChunkSize(pConn))
            return -1;

        if (!sendConnectResult(pConn, nOperateID))
            return -1;
    }

    else if (AVMATCH(&method, &av_releaseStream))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 检查该节目是否推流结束
        CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
        if (!pPlayPath->isEOF())
        {
            if (!sendPublishError(pConn, nInputStreamID))
                return -1;
            return 0;
        }

        // 重置节目
        pPlayPath->reset(false);
    }
    
    else if (AVMATCH(&method, &av_FCPublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 安全起见,初使化节目
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
    }

    else if (AVMATCH(&method, &av_createStream))
    {
        // 生成流ID
        uint32_t uStreamID = pConn->genStreamID();

        printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);

        if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_publish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Publish);
        pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());

        if (!sendPublishStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_play))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
        int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Play);
        pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());

        if (!sendPlayStreamBegin(pConn, nInputStreamID))
            return -1;

        if (!sendPlayStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_FCUnpublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
    }

    else if (AVMATCH(&method, &av_deleteStream))
    {
        int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
        printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);

        // 连接与节目 解除双向关联

        std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPublishPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
        }

        strPlayPath = pConn->getPlayPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPlayPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
        }
    }

    AMF_Reset(&obj);
    return 0;
}

// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
    g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);

    return 0;
}

// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x05;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x06;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_body[4] = 2;
    packet.m_nBodySize = 5;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
    pConn->Rtmp()->m_outChunkSize = 4096;

    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x01;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 4096);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);

    AMFObject obj1 = {0, NULL};

    AMFObjectProperty fmsVer;
    fmsVer.p_name = makeAVal("fmsVer");
    fmsVer.p_type = AMF_STRING;
    fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
    AMF_AddProp(&obj1, &fmsVer);

    AMFObjectProperty capabilities;
    capabilities.p_name = makeAVal("capabilities");
    capabilities.p_type = AMF_NUMBER;
    capabilities.p_vu.p_number = 31;
    AMF_AddProp(&obj1, &capabilities);

    pEnc = AMF_Encode(&obj1, pEnc, pEnd);

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
    *pEnc++ = AMF_NULL;
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("error");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
    AMF_AddProp(&obj2, &code);

    AMFObjectProperty description;
    description.p_name = makeAVal("description");
    description.p_type = AMF_STRING;
    description.p_vu.p_aval = makeAVal("Already publishing");
    AMF_AddProp(&obj2, &description);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x04;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
    pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}