# irtmp **Repository Path**: dellzhui/irtmp ## Basic Information - **Project Name**: irtmp - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-04-04 - **Last Updated**: 2024-09-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RTMP软件设计说明 [TOC] ## 写在前面 1.建议先阅读RTMP官方文档,[RTMP Specification 1.0](http://wwwimages.adobe.com/content/dam/Adobe/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf),不要求精通,但起码对协议有综合的理解,如果对英文官方文档有点吃力,则建议直接熟悉以下博文,加速理解(或自己搜索的其它同类文章): > [RTMP协议解析(一) —— 基本了解- 简书](https://www.jianshu.com/p/5ce11c20a9df) > > [流媒体:RTMP 协议完全解析- 知乎](https://zhuanlan.zhihu.com/p/191542130) 2.建议结合tcpdump+wireshark分析网络报文,对协议流程有更好的理解。 ## 编译运行 - * make * ./rtmp_server ## 服务器测试 - * ffmpeg.exe -re -i test.h264 -f flv rtmp://127.0.0.1/live/stream * ffplay.exe rtmp://127.0.0.1:1935/live/stream * ffplay.exe http://127.0.0.1:8000/live/stream.flv ## 整体思路 RTMP是应用层协议,传输层用tcp实现,因此本质实现一个具备RTMP协议要求(握手、应答等)的“TCP Server”。 整体思路是首先设计一个`传输层`tcp网络框架,然后基于此框架实现`应用层`RTMP Server应用。 ## 重要Class说明 | Class | 说明 | | -------------- | ---------------------------------------- | | TaskScheduler | 线程池,基于epoll实现 | | EventLoop | 事件驱动设计 | | Acceptor | 基于原生socket设计的底层基础server utils | | TcpServer | tcp server实现 | | RtmpServer | rtmp server实现,继承自TcpServer | | RtmpSession | 处理ERMP Session | | RtmpConnection | ETMP连接相关 | | RtmpHandshake | RTMP握手相关 | | RtmpMessage | ETMP消息数据结构定义 | ##配置设计 结论:使用默认配置 > linux server默认工作在0.0.0.0即可,端口使用默认1935。 > > 程序直接运行,无须载入配置。 ## 模块设计说明 ### Acceptor `Acceptor`实现了TCP Server基本功能,见注释: `Acceptor.h` ```cpp class Acceptor { public: Acceptor(EventLoop* eventLoop); virtual ~Acceptor(); //设置tcp连接建立时的回调 void SetNewConnectionCallback(const NewConnectionCallback& cb) { new_connection_callback_ = cb; } //启动监听,这里是对socket操作的整体封装 int Listen(std::string ip, uint16_t port); void Close(); private: //回调方法 void OnAccept(); //内部私有变量,event_loop_见下文详细解释 EventLoop* event_loop_ = nullptr; std::mutex mutex_; std::unique_ptr tcp_socket_; ChannelPtr channel_ptr_; NewConnectionCallback new_connection_callback_; }; } ``` `Listen方法` ```cpp int Acceptor::Listen(std::string ip, uint16_t port) { std::lock_guard locker(mutex_); if (tcp_socket_->GetSocket() > 0) { tcp_socket_->Close(); } //建立套接字 SOCKET sockfd = tcp_socket_->Create(); channel_ptr_.reset(new Channel(sockfd)); SocketUtil::SetReuseAddr(sockfd); SocketUtil::SetReusePort(sockfd); SocketUtil::SetNonBlock(sockfd); //端口绑定 if (!tcp_socket_->Bind(ip, port)) { return -1; } //listen if (!tcp_socket_->Listen(1024)) { return -1; } //设置“读”回调 channel_ptr_->SetReadCallback([this]() { this->OnAccept(); }); channel_ptr_->EnableReading(); event_loop_->UpdateChannel(channel_ptr_); return 0; } ``` 外部调用Listen,程序开始启动监听服务,收到新连接后,执行callback。因此该模块实现了基础socket server功能,上层应用逻辑透过callback实现。 ### TcpServer RTMP基于tcp,须首先实现一个tcp server,如下: `TcpServer.h` ```cpp class TcpServer { public: TcpServer(EventLoop* event_loop); virtual ~TcpServer(); //启动服务 virtual bool Start(std::string ip, uint16_t port); //终止服务 virtual void Stop(); std::string GetIPAddress() const { return ip_; } uint16_t GetPort() const { return port_; } protected: //tcp连接建立的回调方法 virtual TcpConnection::Ptr OnConnect(SOCKET sockfd); //添加Connection virtual void AddConnection(SOCKET sockfd, TcpConnection::Ptr tcp_conn); virtual void RemoveConnection(SOCKET sockfd); //时间循环 EventLoop* event_loop_; uint16_t port_; std::string ip_; std::unique_ptr acceptor_; bool is_started_; std::mutex mutex_; std::unordered_map connections_; }; ``` `Start方法` ```cpp bool TcpServer::Start(std::string ip, uint16_t port) { Stop(); if (!is_started_) { //调用Acceptor的Listen方法,启动服务 if (acceptor_->Listen(ip, port) < 0) { return false; } port_ = port; ip_ = ip; is_started_ = true; return true; } return false; } ``` > 调用Acceptor的Listen方法 构造方法: ```cpp TcpServer::TcpServer(EventLoop* event_loop) : event_loop_(event_loop) , port_(0) , acceptor_(new Acceptor(event_loop_)) , is_started_(false) { //在构造方法里调用Acceptor的SetNewConnectionCallback方法,设置tcp建立连接后的回调 acceptor_->SetNewConnectionCallback([this](SOCKET sockfd) { //回调本类实现的OnConnect方法,获取connection句柄,从而进行断开连接的回调设置等操作 TcpConnection::Ptr conn = this->OnConnect(sockfd); if (conn) { this->AddConnection(sockfd, conn); //回调成功后,设置断开连接的回调 conn->SetDisconnectCallback([this](TcpConnection::Ptr conn) { auto scheduler = conn->GetTaskScheduler(); SOCKET sockfd = conn->GetSocket(); if (!scheduler->AddTriggerEvent([this, sockfd] {this->RemoveConnection(sockfd); })) { scheduler->AddTimer([this, sockfd]() {this->RemoveConnection(sockfd); return false; }, 100); } }); } }); } ``` TcpServer实现的回调: ```cpp TcpConnection::Ptr TcpServer::OnConnect(SOCKET sockfd) { return std::make_shared(event_loop_->GetTaskScheduler().get(), sockfd); } ``` > 获取一个connection 至此,传输层TcpServer搭建完毕。 ### EventLoop 很显然,RTMP Server需要一个事件循环。从tcp server角度考虑,收到新连接后,需要分发该连接任务、并行处理、效率优先等等。 EventLoop是一个更外层的事件循环的框架,它实现了线程池。 它的构造方法很简单,如下: ```cpp EventLoop::EventLoop(uint32_t num_threads) : index_(1) { num_threads_ = 1; if (num_threads > 0) { num_threads_ = num_threads; } this->Loop(); } ``` > 接收线程数参数,然后执行loop方法 loop方法如下: ```cpp void EventLoop::Loop() { std::lock_guard locker(mutex_); if (!task_schedulers_.empty()) { return ; } //这里添加了跨平台支持,linux平台使用epoll,windows使用select for (uint32_t n = 0; n < num_threads_; n++) { #if defined(__linux) || defined(__linux__) //初始化线程池参数,n为线程id std::shared_ptr task_scheduler_ptr(new EpollTaskScheduler(n)); #elif defined(WIN32) || defined(_WIN32) std::shared_ptr task_scheduler_ptr(new SelectTaskScheduler(n)); #endif task_schedulers_.push_back(task_scheduler_ptr); //线程初始化 std::shared_ptr thread(new std::thread(&TaskScheduler::Start, task_scheduler_ptr.get())); thread->native_handle(); threads_.push_back(thread); } const int priority = TASK_SCHEDULER_PRIORITY_REALTIME; for (auto iter : threads_) { #if defined(__linux) || defined(__linux__) #elif defined(WIN32) || defined(_WIN32) switch (priority) { case TASK_SCHEDULER_PRIORITY_LOW: SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_BELOW_NORMAL); break; case TASK_SCHEDULER_PRIORITY_NORMAL: SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_NORMAL); break; case TASK_SCHEDULER_PRIORITYO_HIGH: SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_ABOVE_NORMAL); break; case TASK_SCHEDULER_PRIORITY_HIGHEST: SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_HIGHEST); break; case TASK_SCHEDULER_PRIORITY_REALTIME: SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_TIME_CRITICAL); break; } #endif } } ``` EventLoop本质上是对`TaskScheduler`的上层集成,核心在TaskScheduler里 ### TaskScheduler 线程调度Utils类,多线程设计基于此。 ```cpp class TaskScheduler { public: TaskScheduler(int id=1); virtual ~TaskScheduler(); void Start(); void Stop(); TimerId AddTimer(TimerEvent timerEvent, uint32_t msec); void RemoveTimer(TimerId timerId); bool AddTriggerEvent(TriggerEvent callback); virtual void UpdateChannel(ChannelPtr channel) { }; virtual void RemoveChannel(ChannelPtr& channel) { }; virtual bool HandleEvent(int timeout) { return false; }; int GetId() const { return id_; } protected: void Wake(); void HandleTriggerEvent(); int id_ = 0; std::atomic_bool is_shutdown_; std::unique_ptr wakeup_pipe_; std::shared_ptr wakeup_channel_; std::unique_ptr> trigger_events_; std::mutex mutex_; TimerQueue timer_queue_; static const char kTriggetEvent = 1; static const char kTimerEvent = 2; static const int kMaxTriggetEvents = 50000; }; ``` > 此类实现了通用任务调度功能,与主题设计相关度小,此处不做赘述。 ### RtmpServer RtmpServer继承了TcpServer,并增加了对ETMP Session的操作(增删改查)如下: ```cpp class RtmpServer : public TcpServer, public Rtmp, public std::enable_shared_from_this { public: using EventCallback = std::function; static std::shared_ptr Create(xop::EventLoop* event_loop); ~RtmpServer(); //设置事件回调,用于handle各种异常类、提示类信息等 void SetEventCallback(EventCallback event_cb); private: friend class RtmpConnection; friend class HttpFlvServer; RtmpServer(xop::EventLoop *event_loop); //session操作 void AddSession(std::string stream_path); void RemoveSession(std::string stream_path); RtmpSession::Ptr GetSession(std::string stream_path); bool HasSession(std::string stream_path); bool HasPublisher(std::string stream_path); void NotifyEvent(std::string event_type, std::string stream_path); virtual TcpConnection::Ptr OnConnect(SOCKET sockfd); xop::EventLoop *event_loop_; std::mutex mutex_; std::unordered_map rtmp_sessions_; std::vector event_callbacks_; }; ``` ### RtmpSession 主要处理RTMP协议相关的动作,如setMetaData(建议首先了解RTMP协议) ```cpp class RtmpSession { public: using Ptr = std::shared_ptr; RtmpSession(); virtual ~RtmpSession(); void SetMetaData(AmfObjects metaData) { std::lock_guard lock(mutex_); meta_data_ = metaData; } void SetAvcSequenceHeader(std::shared_ptr avcSequenceHeader, uint32_t avcSequenceHeaderSize) { std::lock_guard lock(mutex_); avc_sequence_header_ = avcSequenceHeader; avc_sequence_header_size_ = avcSequenceHeaderSize; } void SetAacSequenceHeader(std::shared_ptr aacSequenceHeader, uint32_t aacSequenceHeaderSize) { std::lock_guard lock(mutex_); aac_sequence_header_ = aacSequenceHeader; aac_sequence_header_size_ = aacSequenceHeaderSize; } AmfObjects GetMetaData() { std::lock_guard lock(mutex_); return meta_data_; } void AddSink(std::shared_ptr sink); void RemoveSink(std::shared_ptr sink); int GetClients(); void SendMetaData(AmfObjects& metaData); void SendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr data, uint32_t size); std::shared_ptr GetPublisher(); void SetGopCache(uint32_t cacheLen) { std::lock_guard lock(mutex_); max_gop_cache_len_ = cacheLen; } void SaveGop(uint8_t type, uint64_t timestamp, std::shared_ptr data, uint32_t size); private: void SendGop(std::shared_ptr sink); std::mutex mutex_; AmfObjects meta_data_; bool has_publisher_ = false; std::weak_ptr publisher_; std::unordered_map> rtmp_sinks_; std::shared_ptr avc_sequence_header_; std::shared_ptr aac_sequence_header_; uint32_t avc_sequence_header_size_ = 0; uint32_t aac_sequence_header_size_ = 0; uint64_t gop_index_ = 0; uint32_t max_gop_cache_len_ = 0; struct AVFrame { uint8_t type = 0; uint64_t timestamp = 0; uint32_t size = 0; std::shared_ptr data = nullptr; }; typedef std::shared_ptr AVFramePtr; std::map>> gop_cache_; }; ``` ### RtmpConnection Rtmp连接实现类 ```cpp class RtmpConnection : public TcpConnection, public RtmpSink { public: using PlayCallback = std::function; //连接类型 enum ConnectionState { HANDSHAKE, START_CONNECT, START_CREATE_STREAM, START_DELETE_STREAM, START_PLAY, START_PUBLISH, }; //连接模式 enum ConnectionMode { RTMP_SERVER, RTMP_PUBLISHER, RTMP_CLIENT }; RtmpConnection(std::shared_ptr server, TaskScheduler* scheduler, SOCKET sockfd); RtmpConnection(std::shared_ptr publisher, TaskScheduler* scheduler, SOCKET sockfd); RtmpConnection(std::shared_ptr client, TaskScheduler* scheduler, SOCKET sockfd); virtual ~RtmpConnection(); //以下定义了各种Stream操作的方法 std::string GetStreamPath() const { return stream_path_; } std::string GetStreamName() const { return stream_name_; } std::string GetApp() const { return app_; } //获取MetaData AmfObjects GetMetaData() const { return meta_data_; } virtual bool IsPlayer() override { return connection_state_ == START_PLAY; } virtual bool IsPublisher() override { return connection_state_ == START_PUBLISH; } virtual bool IsPlaying() override { return is_playing_; } virtual bool IsPublishing() override { return is_publishing_; } virtual uint32_t GetId() override { return (uint32_t)this->GetSocket(); } std::string GetStatus() { if (status_ == "") { return "unknown error"; } return status_; } private: friend class RtmpSession; friend class RtmpServer; friend class RtmpPublisher; friend class RtmpClient; RtmpConnection(TaskScheduler *scheduler, SOCKET sockfd, Rtmp* rtmp); //“读”回调 bool OnRead(BufferReader& buffer); void OnClose(); //消息处理 bool HandleChunk(BufferReader& buffer); bool HandleMessage(RtmpMessage& rtmp_msg); bool HandleInvoke(RtmpMessage& rtmp_msg); bool HandleNotify(RtmpMessage& rtmp_msg); bool HandleVideo(RtmpMessage& rtmp_msg); bool HandleAudio(RtmpMessage& rtmp_msg); //RTMP握手 bool Handshake(); bool Connect(); bool CretaeStream(); bool Publish(); bool Play(); bool DeleteStream(); //连接、播放等 bool HandleConnect(); bool HandleCreateStream(); bool HandlePublish(); bool HandlePlay(); bool HandlePlay2(); bool HandleDeleteStream(); bool HandleResult(RtmpMessage& rtmp_msg); bool HandleOnStatus(RtmpMessage& rtmp_msg); void SetPeerBandwidth(); void SendAcknowledgement(); void SetChunkSize(); void SetPlayCB(const PlayCallback& cb); bool SendInvokeMessage(uint32_t csid, std::shared_ptr payload, uint32_t payload_size); bool SendNotifyMessage(uint32_t csid, std::shared_ptr payload, uint32_t payload_size); bool IsKeyFrame(std::shared_ptr payload, uint32_t payload_size); void SendRtmpChunks(uint32_t csid, RtmpMessage& rtmp_msg); virtual bool SendMetaData(AmfObjects metaData) override; virtual bool SendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr payload, uint32_t payload_size) override; virtual bool SendVideoData(uint64_t timestamp, std::shared_ptr payload, uint32_t payload_size) override; virtual bool SendAudioData(uint64_t timestamp, std::shared_ptr payload, uint32_t payload_size) override; }; ``` ### RtmpHandshake RTMP握手实现 ```cpp class RtmpHandshake { public: enum State { HANDSHAKE_C0C1, HANDSHAKE_S0S1S2, HANDSHAKE_C2, HANDSHAKE_COMPLETE }; RtmpHandshake(State state); virtual ~RtmpHandshake(); int Parse(xop::BufferReader& in_buffer, char* res_buf, uint32_t res_buf_size); int BuildC0C1(char* buf, uint32_t buf_size); bool IsCompleted() const { return handshake_state_ == HANDSHAKE_COMPLETE; } private: State handshake_state_; }; ``` ### RtmpMessage RTMP消息数据结构 ```cpp struct RtmpMessageHeader { uint8_t timestamp[3]; uint8_t length[3]; uint8_t type_id; uint8_t stream_id[4]; /* 小端格式 */ }; struct RtmpMessage { uint32_t timestamp = 0; uint32_t length = 0; uint8_t type_id = 0; uint32_t stream_id = 0; uint32_t extend_timestamp = 0; uint64_t _timestamp = 0; uint8_t codecId = 0; uint8_t csid = 0; uint32_t index = 0; std::shared_ptr payload = nullptr; void Clear() { index = 0; timestamp = 0; extend_timestamp = 0; if (length > 0) { payload.reset(new char[length], std::default_delete()); } } bool IsCompleted() const { if (index == length && length > 0 && payload != nullptr) { return true; } return false; } }; ```