live555 rtsp服务器实战之createNewStreamSource
概要
rtsp在交互的过程中用到很多协议:tcp,udp,rtp,rtcp,sdp等协议;该篇文章主要分析在live555中这些协议是什么时候被创建的,什么时候被使用的等协议相关流程。
TCP:服务器与客户端进行协商(OPTION DESCRIBE SETUP PLAY);
UDP/TCP:协议是rtsp服务器用来想客户端推流;当然rtsp向客户端推流也可以使用tcp协议;那么就rtsp而言使用udp推流和使用tcp推流有什么区别呢?
UDP推流
tcp连接进行rtsp信令交互;
创建新的udp套接字来发送rtp包;
创建新的udp套接字来发送rtcp包;
TCP推流
tcp连接进行rtsp信令交互;
复用rtsp的tcp连接发送rtp和rtcp包;
嵌入式开发一般使用udp推流,实时性相对较高;
RTP:对视频流(h264/h265)/音频流(AAC/MP3)裸流进行封装,用于网络传输;
RTCP:服务器和客户端用来管理流媒体协议;
TCP交互协商
在程序创建RTSPServer类对象时就会创建用于信令协商的TCP协议,见如下代码:
//创建RTSPServer类对象 RTSPServer* rtspServer = RTSPServer::createNew(*env, 8554, authDB); //createNew实现 RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationSeconds) { int ourSocketIPv4 = setUpOurSocket(env, ourPort, AF_INET); int ourSocketIPv6 = setUpOurSocket(env, ourPort, AF_INET6); if (ourSocketIPv4 < 0 && ourSocketIPv6 < 0) return NULL; return new RTSPServer(env, ourSocketIPv4, ourSocketIPv6, ourPort, authDatabase, reclamationSeconds); }
从源码可以看出创建RTSPServer类对象的时候会创建ipv4和ipv6两种套接字,因此理论上来说live555实现的rtsp服务器支持ipv4和ipv6两种网络传输。
//RTSPServer构造函数 RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocketIPv4, int ourSocketIPv6, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationSeconds) : GenericMediaServer(env, ourSocketIPv4, ourSocketIPv6, ourPort, reclamationSeconds), fHTTPServerSocketIPv4(-1), fHTTPServerSocketIPv6(-1), fHTTPServerPort(0), fClientConnectionsForHTTPTunneling(NULL), // will get created if needed fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)), fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)), fRegisterOrDeregisterRequestCounter(0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True), fOurConnectionsUseTLS(False), fWeServeSRTP(False) { } //GenericMediaServer构造函数 GenericMediaServer ::GenericMediaServer(UsageEnvironment& env, int ourSocketIPv4, int ourSocketIPv6, Port ourPort, unsigned reclamationSeconds) : Medium(env), fServerSocketIPv4(ourSocketIPv4), fServerSocketIPv6(ourSocketIPv6), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds), fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)), fClientSessions(HashTable::create(STRING_HASH_KEYS)), fPreviousClientSessionId(0), fTLSCertificateFileName(NULL), fTLSPrivateKeyFileName(NULL) { ignoreSigPipeOnSocket(fServerSocketIPv4); // so that clients on the same host that are killed don't also kill us ignoreSigPipeOnSocket(fServerSocketIPv6); // ditto // Arrange to handle connections from others: env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv4, incomingConnectionHandlerIPv4, this); env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv6, incomingConnectionHandlerIPv6, this); }
在GenericMediaServer构造函数中会把创建的fServerSocketIPv4和fServerSocketIPv6这两个套接字插入到双向闭环链表中等待doEventLoop循环处理,对应的处理函数分别为:incomingConnectionHandlerIPv4, incomingConnectionHandlerIPv6;最终都会调用incomingConnectionHandlerOnSocket函数;
void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) { struct sockaddr_storage clientAddr; SOCKLEN_T clientAddrLen = sizeof clientAddr; int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen); if (clientSocket < 0) { int err = envir().getErrno(); if (err != EWOULDBLOCK) { envir().setResultErrMsg("accept() failed: "); } return; } ignoreSigPipeOnSocket(clientSocket); // so that clients on the same host that are killed don't also kill us makeSocketNonBlocking(clientSocket); increaseSendBufferTo(envir(), clientSocket, 50*1024); #ifdef DEBUG envir() << "accept()ed connection from " << AddressString(clientAddr).val() << "\n"; #endif // Create a new object for handling this connection: (void)createNewClientConnection(clientSocket, clientAddr); } //createNewClientConnection函数实现 GenericMediaServer::ClientConnection* RTSPServer::createNewClientConnection(int clientSocket, struct sockaddr_storage const& clientAddr) { return new RTSPClientConnection(*this, clientSocket, clientAddr, fOurConnectionsUseTLS); }
在doEventLoop循环中会议中accept监视tcp连接,如果有客户端连接就会创建客户端连接类RTSPClientConnection;最终会把客户端套接字clientSocket传递给ClientConnection构造函数;
GenericMediaServer::ClientConnection ::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_storage const& clientAddr, Boolean useTLS) : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr), fTLS(envir()) { fInputTLS = fOutputTLS = &fTLS; // Add ourself to our 'client connections' table: fOurServer.fClientConnections->Add((char const*)this, this); if (useTLS) { // Perform extra processing to handle a TLS connection: fTLS.setCertificateAndPrivateKeyFileNames(ourServer.fTLSCertificateFileName, ourServer.fTLSPrivateKeyFileName); fTLS.isNeeded = True; fTLS.tlsAcceptIsNeeded = True; // call fTLS.accept() the next time the socket is readable } // Arrange to handle incoming requests: resetRequestBuffer(); envir().taskScheduler() .setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, incomingRequestHandler, this); } //incomingRequestHandler函数最终调用 void GenericMediaServer::ClientConnection::incomingRequestHandler() { if (fInputTLS->tlsAcceptIsNeeded) { // we need to successfully call fInputTLS->accept() first: if (fInputTLS->accept(fOurSocket) <= 0) return; // either an error, or we need to try again later fInputTLS->tlsAcceptIsNeeded = False; // We can now read data, as usual: } int bytesRead; if (fInputTLS->isNeeded) { bytesRead = fInputTLS->read(&fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft); } else { struct sockaddr_storage dummy; // 'from' address, meaningless in this case bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy); } handleRequestBytes(bytesRead);//该函数实现了对 OPTION DESCRIBE SETUP等各种信令的处理逻辑 }
在构造函数中setBackgroundHandling会把客户端套接字fOurSocket和对应的处理函数incomingRequestHandler添加到闭环双链表中,在doEventLoop中循环遍历,客户端有信令交互就调用相关的处理函数;至此用于协商的TCP协议处理流程就结束了。
关于live555的闭环双向链表参考我的另一篇文章:live555的核心数据结构值之闭环双向链表-CSDN博客
UDP流媒体传输
UDP流媒体传输服务器需要创建两个四个UDP套接字,用于传输音频RTP,音频RTCP,视频RTP,视频RTCP;该文档是以H264的传输为例所以只介绍视频RTP端口,视频RTCP端口的创建过程,音频类似;
RTP,RTCP端口是在SETUP信令处理函数handleCmd_SETUP中被创建,该函数最终调用了getStreamParameters函数:
subsession->getStreamParameters(fOurSessionId, fOurClientConnection->fClientAddr, clientRTPPort, clientRTCPPort, fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId, &fOurClientConnection->fTLS, destinationAddress, destinationTTL, fIsMulticast, serverRTPPort, serverRTCPPort, fStreamStates[trackNum].streamToken);
该函数将客户端的RTP端口:clientRTPPort和RTCP端口:clientRTCPPort都进行了处理;这两个端口是客户端发送SETUP信令时携带的消息;告诉服务器RTP RTCP包改往哪里发;getStreamParameters也创建了服务器的RTP RTCP端口:serverRTPPort, serverRTCPPort;
getStreamParameters内部调用了createGroupsock函数:
void OnDemandServerMediaSubsession ::getStreamParameters(...) { . . . if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations portNumBits serverPortNum; if (clientRTCPPort.num() == 0) { // We're streaming raw UDP (not RTP). Create a single groupsock: NoReuse dummy(envir()); // ensures that we skip over ports that are already in use for (serverPortNum = fInitialPortNum;; ++serverPortNum) { serverRTPPort = serverPortNum; rtpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTPPort); if (rtpGroupsock->socketNum() >= 0) break; // success } udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock); } else { // Normal case: We're streaming RTP (over UDP or TCP). Create a pair of // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even). // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.) NoReuse dummy(envir()); // ensures that we skip over ports that are already in use for (portNumBits serverPortNum = fInitialPortNum;; ++serverPortNum) { serverRTPPort = serverPortNum; //创建RTP端口(rtp的UDP套接字) rtpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTPPort); if (rtpGroupsock->socketNum() < 0) { delete rtpGroupsock; continue; // try again } if (fMultiplexRTCPWithRTP) { // Use the RTP 'groupsock' object for RTCP as well: serverRTCPPort = serverRTPPort; rtcpGroupsock = rtpGroupsock; } else { // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP: //RTCP端口号在RTP端口号的基础上加1 serverRTCPPort = ++serverPortNum; //创建RTCP端口(rtcp的UDP套接字) rtcpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTCPPort); if (rtcpGroupsock->socketNum() < 0) { delete rtpGroupsock; delete rtcpGroupsock; continue; // try again } } break; // success } unsigned char rtpPayloadType = 96 + trackNumber() - 1; // if dynamic rtpSink = mediaSource == NULL ? NULL : createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource); if (rtpSink != NULL) { if (fParentSession->streamingUsesSRTP) { rtpSink->setupForSRTP(fMIKEYStateMessage, fMIKEYStateMessageSize); } if (rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate(); } } . . . }
由代码可以看出serverRTPPort的初始值是fInitialPortNum;而fInitialPortNum在创建OnDemandServerMediaSubsession对象时有个默认值6970;如果没有设置端口号则使用默认端口号;
上面代码可以看出而RTCP端口号是在RTP的端口号的基础上加1
OnDemandServerMediaSubsession(UsageEnvironment& env, Boolean reuseFirstSource, portNumBits initialPortNum = 6970, Boolean multiplexRTCPWithRTP = False);
当第二个客户端连接时,依然是从6970开始创建所需的RTP RTCP端口号,但是createGroupsock会发现6970 6971端口号被占用,于是返回-1;继续for循环将端口号累加;
for (portNumBits serverPortNum = fInitialPortNum;; ++serverPortNum) { serverRTPPort = serverPortNum; rtpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTPPort); if (rtpGroupsock->socketNum() < 0) { delete rtpGroupsock; continue; // try again } . . . }
//fInitialPortNum为基数6970;
第一个客户端:rtp:6970 rtcp:6971
第二个客户端:6970 6971 被占用createGroupsock返回-1;因此for循环continue继续累加++serverPortNum; rtp:6972 rtcp:6973
......
那么怎么自定义端口号呢?
我们在做rtsp服务器的时候都会创建一个类用于实现createNewStreamSource虚函数该类继承于OnDemandServerMediaSubsession;而类的构造函数里会执行OnDemandServerMediaSubsession的构造函数;所以如果你想要自己定义服务器的RTP端口号只需在执行OnDemandServerMediaSubsession构造函数是传入参数即可:
H264LiveVideoServerMediaSubssion::H264LiveVideoServerMediaSubssion( UsageEnvironment &env, Boolean reuseFirstSource) : OnDemandServerMediaSubsession(env, reuseFirstSource, 1234) {}
TCP流媒体传输使用的时信令交互的套接字,这里不做解释;关于流媒体裸流怎么打包成RTP的参考上面的文章;
该文章在持续更新,望持续关注;