3#include "Foundation/Logging/Logger.h"
4#include "Foundation/Network/Address.h"
5#include "Foundation/Network/Network.h"
6#include "Foundation/Platform/Timer.h"
20Cogs::RTSP::RTSP(std::string_view url, std::string_view userAgent):
27 has_set_parameter(false),
28 has_get_parameter(false),
30 agentString(userAgent)
32 incomingBuffer.resize(4096,
false);
34 enableAutoReconnect();
35 setConnectTimeout(4000);
38 Cogs::Network::addConnection(
this);
48bool Cogs::RTSP::sendSetup(MediaDescription& mdt,
const std::string& mediaURL, uint16_t client_port_a, uint16_t client_port_b) {
50 Message* message =
new Message();
52 message->userData = &mdt;
53 message->firstLine =
"SETUP " + mediaURL +
" RTSP/1.0";
54 message->headers[
"Transport"] =
"RTP/AVP;unicast;client_port=" + std::to_string(client_port_a) +
"-" + std::to_string(client_port_b);
56 addStandardHeaders(*message);
57 queueMessage(message);
63bool Cogs::RTSP::sendPlay() {
65 Message* message =
new Message();
67 message->firstLine = buildURL(
"PLAY");
69 addStandardHeaders(*message);
70 queueMessage(message);
76bool Cogs::RTSP::sendPause() {
78 Message* message =
new Message();
80 message->firstLine = buildURL(
"PAUSE");
82 addStandardHeaders(*message);
83 queueMessage(message);
89bool Cogs::RTSP::sendTeardown() {
91 Message* message =
new Message();
93 message->firstLine = buildURL(
"TEARDOWN");
95 addStandardHeaders(*message);
96 queueMessage(message);
102void Cogs::RTSP::parseSDP(
const std::string& sdp) {
103 std::stringstream ss(sdp);
107 mediaDescriptions.clear();
111 while (std::getline(ss, str,
'\n')) {
118 std::regex r(
"IN IP4 ([0-9]*).([0-9]*).([0-9]*).([0-9]*)");
120 if (std::regex_search(str, m, r)) {
121 ip[0] =
static_cast<uint8_t
>(std::atoi(m[1].str().c_str()));
122 ip[1] =
static_cast<uint8_t
>(std::atoi(m[2].str().c_str()));
123 ip[2] =
static_cast<uint8_t
>(std::atoi(m[3].str().c_str()));
124 ip[3] =
static_cast<uint8_t
>(std::atoi(m[4].str().c_str()));
136 MediaDescription desc = {};
139 desc.media_name = str.substr(2);
141 if (std::regex_search(str, m, std::regex(
"([a-z]*) ([0-9]*) RTP/AVP ([0-9]*)"))) {
142 desc.media_type = m[1].str();
144 if (desc.media_type ==
"video") {
147 else if (desc.media_type ==
"audio") {
150 desc.rtp_payload_type = std::atoi(m[3].str().c_str());
156 mediaDescriptions.push_back(desc);
160 if (!mediaDescriptions.empty()) {
161 MediaDescription& desc = mediaDescriptions.back();
163 if (str.find(
"rtpmap") == 2) {
166 if (std::regex_search(str, m, std::regex(
"rtpmap:([0-9]*) ([A-Za-z0-9-]*)/([0-9]*)/?([A-Za-z0-9-]*)"))) {
167 uint32_t type = std::atoi(m[1].str().c_str());
169 assert(type == desc.rtp_payload_type);
171 desc.encoding_name = m[2].str();
172 desc.rtp_clock_rate = std::atoi(m[3].str().c_str());
178 else if (str.find(
"control") == 2) {
181 if (std::regex_search(str, m, std::regex(
"control:([0-9a-zA-Z=:/\\.\\-\\?]*)"))) {
182 desc.control_id = m[1].str();
185 else if (str.find(
"fmtp") == 2) {
188 if (std::regex_search(str, m, std::regex(
"fmtp:([0-9]*)"))) {
189 uint32_t type = std::atoi(m[1].str().c_str());
190 assert(type == desc.rtp_payload_type);
192 if (std::regex_search(str, m, std::regex(
"profile-level-id=([A-fa-f0-9]*)"))) {
193 desc.profile_level_id = std::stol(m[1].str().c_str(), 0, 16);
195 if (std::regex_search(str, m, std::regex(
"packetization-mode=([0-9]*)"))) {
196 desc.packetization_mode = std::atoi(m[1].str().c_str());
198 if (std::regex_search(str, m, std::regex(
"sprop-parameter-sets=([A-Za-z0-9+/]*)[=]*,[=]*([A-Za-z0-9+/]*)[=]*"))) {
199 desc.sps = base64_decode(m[1].str());
200 desc.pps = base64_decode(m[2].str());
202 if (std::regex_search(str, m, std::regex(
"sprop-vps=([A-Za-z0-9+/]*)[=]*"))) {
203 desc.vps = base64_decode(m[1].str());
205 if (std::regex_search(str, m, std::regex(
"sprop-sps=([A-Za-z0-9+/]*)[=]*"))) {
206 desc.sps = base64_decode(m[1].str());
208 if(std::regex_search(str, m, std::regex(
"sprop-pps=([A-Za-z0-9+/]*)[=]*"))){
209 desc.pps = base64_decode(m[1].str());
211 if(std::regex_search(str, m, std::regex(
"sprop-max-don-diff"))){
216 else if(str.find(
"framerate") == 2){
219 else if(str.find(
"transform") == 2){
229bool Cogs::RTSP::processOutgoing() {
231 if (!outgoingBuffer.unreadSize()) {
232 Cogs::LockGuard lock(queueMutex);
234 if (!queuedMessages.empty()) {
235 Message* message = queuedMessages.front();
237 queuedMessages.pop_front();
239 outgoingBuffer.clear();
240 outgoingBuffer.write(message->firstLine.data(), message->firstLine.size());
241 outgoingBuffer.write(
"\x0D\x0A", 2);
242 for (
auto& [key,value] : message->headers) {
243 outgoingBuffer.write(key.data(), key.size());
244 outgoingBuffer.write(
": ", 2);
245 outgoingBuffer.write(value.data(), value.size());
246 outgoingBuffer.write(
"\x0D\x0A", 2);
248 outgoingBuffer.write(
"\x0D\x0A", 2);
249 outgoingBuffer.write(message->data(), message->size());
250 outgoingBuffer.seek(0, Memory::MemoryBuffer::Anchor::Start);
252 sentMessages.push_back(message);
255 if (outgoingBuffer.unreadSize()) {
256 uint64_t bytesSent = 0;
258 if (send(outgoingBuffer.dataFromPos(), outgoingBuffer.unreadSize(), bytesSent)) {
259 outgoingBuffer.seek(bytesSent, Memory::MemoryBuffer::Anchor::Current);
263 int64_t now = Timer::currentTimeMilliseconds();
265 if (keepAliveTime < now) {
266 Message* message =
new Message();
268 if (has_set_parameter) {
269 message->firstLine = buildURL(
"SET_PARAMETER");
271 else if (has_get_parameter) {
272 message->firstLine = buildURL(
"GET_PARAMETER");
275 message->firstLine = buildURL(
"OPTIONS");
278 addStandardHeaders(*message);
279 queueMessage(message);
280 keepAliveTime = now + timeout - 1000;
289 uint64_t remainingSpace = incomingBuffer.unreadSize() - 1;
290 uint64_t bytesReceived = incomingBuffer.unreadSize();
291 char* bufferStart =
static_cast<char*
>( incomingBuffer.data() );
293 if (receive(bufferStart + incomingBuffer.pos(), remainingSpace, bytesReceived)) {
294 incomingBuffer.seek(bytesReceived, Cogs::Memory::MemoryBuffer::Anchor::Current);
295 *
static_cast<char*
>(incomingBuffer.dataFromPos()) = 0;
297 if (!incomingMessage.finishedHeaders) {
298 char* lineStart = bufferStart;
301 char* lineEnd = std::strstr(lineStart,
"\x0D\x0A");
306 if (lineStart == lineEnd) {
307 incomingMessage.finishedHeaders =
true;
308 if (!incomingMessage.size()) {
309 incomingMessage.finishedBody =
true;
311 lineStart = lineEnd + 2;
315 if (incomingMessage.firstLine.empty()) {
316 incomingMessage.firstLine = lineStart;
319 char* colon = strchr(lineStart,
':');
322 incomingMessage.headers[lineStart] = ++colon;
324 if (!strcmp(lineStart,
"Content-Length")) {
325 incomingMessage.resize(std::atoi(colon));
330 lineStart = lineEnd + 2;
336 uint64_t remainingData = (
static_cast<char*
>(incomingBuffer.data()) + incomingBuffer.size()) - lineStart;
338 memmove(incomingBuffer.data(), lineStart, remainingData);
339 incomingBuffer.seek(
static_cast<char*
>(incomingBuffer.data()) - lineStart, Cogs::Memory::MemoryBuffer::Anchor::Current);
341 if (!incomingMessage.finishedBody) {
342 memcpy(incomingMessage.dataFromPos(), incomingBuffer.data(), incomingBuffer.pos());
343 incomingMessage.seek(incomingBuffer.pos(), Cogs::Memory::MemoryBuffer::Anchor::Current);
344 incomingBuffer.seek(0, Cogs::Memory::MemoryBuffer::Anchor::Start);
346 if (incomingMessage.pos() == incomingMessage.size()) {
347 incomingMessage.finishedBody =
true;
350 if (incomingMessage.finishedBody) {
351 incomingMessage.seek(0, Cogs::Memory::MemoryBuffer::Anchor::Start);
352 processIncomingMessage(incomingMessage);
354 incomingMessage.reset();
359void Cogs::RTSP::processIncomingMessage(Message& message) {
360 std::string seq = message.headers[
"CSeq"];
363 if (message.firstLine.starts_with(
"OPTIONS") ||
364 message.firstLine.starts_with(
"GET_PARAMETER" )) {
365 Message* response =
new Message();
368 response->firstLine =
"RTSP/1.0 501 Not Implemented";
369 response->headers[
"CSeq"] = seq;
370 response->headers[
"User-Agent"] = agentString;
371 queueMessage(response);
375 LOG_INFO(logger,
"Received: %s", message.firstLine.c_str());
377 Message* request =
nullptr;
380 for (
auto i = sentMessages.begin(), e = sentMessages.end(); i != e; ++i) {
381 if ((*i)->headers[
"CSeq"] == seq) {
383 sentMessages.erase(i);
388 if (message.firstLine.starts_with(
"RTSP/1.0")) {
389 switch (std::atoi(message.firstLine.c_str() + 9)) {
393 breakdownURL(message.headers[
"Location"]);
395 request->firstLine = buildURL(request->firstLine.substr(0, request->firstLine.find(
' ')));
396 request->headers[
"CSeq"] = std::to_string(sequenceNumber++);
397 queueMessage(request);
409 if (request->firstLine.starts_with(
"OPTIONS")) {
410 auto i = message.headers.find(
"Public");
412 if (i != message.headers.end()) {
413 has_describe = i->second.find(
"DESCRIBE") != std::string::npos;
414 has_setup = i->second.find(
"SETUP") != std::string::npos;
415 has_play = i->second.find(
"PLAY") != std::string::npos;
416 has_pause = i->second.find(
"PAUSE") != std::string::npos;
417 has_teardown = i->second.find(
"TEARDOWN") != std::string::npos;
418 has_set_parameter = i->second.find(
"SET_PARAMETER") != std::string::npos;
419 has_get_parameter = i->second.find(
"GET_PARAMETER") != std::string::npos;
422 else if (request->firstLine.starts_with(
"DESCRIBE")) {
423 parseSDP(std::string(
static_cast<char*
>(message.data()), message.size()));
424 receivedMediaDescriptions();
426 else if (request->firstLine.starts_with(
"SETUP")) {
427 sessionID = message.headers[
"Session"];
429 MediaDescription* md =
static_cast<MediaDescription*
>(request->userData);
430 size_t semicolon = sessionID.find(
";timeout=");
431 std::string& s = message.headers[
"Transport"];
434 if (semicolon != std::string::npos) {
435 timeout = std::atoi(sessionID.c_str() + semicolon + 9) * 1000;
436 sessionID.erase(semicolon);
438 keepAliveTime = Timer::currentTimeMilliseconds() + timeout - 1000;
440 if (std::regex_search(s, m, std::regex(
"server_port=([0-9]+)-([0-9]+)"))) {
441 md->server_port[0] =
static_cast<uint16_t
>(std::stoi(m[1].str()));
442 md->server_port[1] =
static_cast<uint16_t
>(std::stoi(m[2].str()));
444 if (std::regex_search(s, m, std::regex(
"client_port=([0-9]+)-([0-9]+)"))) {
445 md->client_port[0] =
static_cast<uint16_t
>(std::stoi(m[1].str()));
446 md->client_port[1] =
static_cast<uint16_t
>(std::stoi(m[2].str()));
448 if (std::regex_search(s, m, std::regex(
"ssrc=([A-Fa-f0-9]+)"))) {
449 md->ssrc = std::stoul(m[1].str(), 0, 16);
451 receivedStreamSetup(*md);
453 else if (request->firstLine.starts_with(
"TEARDOWN")) {
462void Cogs::RTSP::queueMessage(Message* message) {
464 Cogs::LockGuard lock(queueMutex);
466 LOG_INFO(logger,
"Sending %s.", message->firstLine.c_str());
467 queuedMessages.push_back(message);
471void Cogs::RTSP::addStandardHeaders(Message& message) {
472 message.headers[
"CSeq"] = std::to_string(sequenceNumber++);
473 message.headers[
"User-Agent"] = agentString;
475 if (!sessionID.empty()) {
476 message.headers[
"Session"] = sessionID;
480bool Cogs::RTSP::breakdownURL(std::string_view url) {
481 std::string oldHostname = hostname;
482 uint16_t oldPort = hostPort;
483 size_t start = (url.starts_with(
"rtsp://")) ? 7 : 0;
484 size_t end = url.find(
'/', start);
486 hostname = std::string(url.substr(start, end - start));
487 if (end != std::string::npos) {
488 resourcePath = std::string(url.substr(end + 1));
491 end = hostname.find(
'@');
492 if (end != std::string::npos) {
493 userInfo = std::string(hostname.substr(0, end++));
494 hostname.erase(0, end);
496 start = hostname.find(
':');
497 if (start != std::string::npos) {
498 hostPort =
static_cast<uint16_t
>(std::atoi(hostname.c_str() + start + 1));
499 hostname.erase(start);
504 if ((hostname != oldHostname) || (hostPort != oldPort)) {
505 sockAddr = Network::SockaddrIn(Network::AddrIn(hostname), hostPort);
511std::string Cogs::RTSP::buildURL(std::string_view method) {
518 if (!userInfo.empty()) {
525 if (!resourcePath.empty()) {
533void Cogs::RTSP::sendOptions() {
534 Message* message =
new Message();
536 message->firstLine = buildURL(
"OPTIONS");
538 addStandardHeaders(*message);
539 queueMessage(message);
542void Cogs::RTSP::sendDescribe() {
543 Message* message =
new Message();
545 message->firstLine = buildURL(
"DESCRIBE");
546 message->headers[
"Accept"] =
"application/sdp";
548 addStandardHeaders(*message);
549 queueMessage(message);
Log implementation class.
virtual bool processIncoming() override
Manages accepting incoming connections when this connection is a listener.
constexpr Log getLogger(const char(&name)[LEN]) noexcept