Cogs.Core
RTSP.cpp
1#include "RTSP.h"
2
3#include "Foundation/Logging/Logger.h"
4#include "Foundation/Network/Address.h"
5#include "Foundation/Network/Network.h"
6#include "Foundation/Platform/Timer.h"
7
8#include <base64.h>
9
10#include <regex>
11#include <sstream>
12#include <cstring>
13
14#include <curl/curl.h>
15
16namespace {
18}
19
20Cogs::RTSP::RTSP(std::string_view url, std::string_view userAgent):
21 originalURL(url),
22 has_describe(false),
23 has_setup(false),
24 has_play(false),
25 has_pause(false),
26 has_teardown(false),
27 has_set_parameter(false),
28 has_get_parameter(false),
29 is_active(false),
30 agentString(userAgent)
31{
32 incomingBuffer.resize(4096, false);
33
34 enableAutoReconnect();
35 setConnectTimeout(4000);
36 breakdownURL(url);
37
38 Cogs::Network::addConnection(this);
39
40 sendOptions();
41 sendDescribe();
42}
43
44Cogs::RTSP::~RTSP() {
45 sendTeardown();
46}
47
48bool Cogs::RTSP::sendSetup(MediaDescription& mdt, const std::string& mediaURL, uint16_t client_port_a, uint16_t client_port_b) {
49 if (has_setup) {
50 Message* message = new Message();
51
52 message->userData = &mdt;
53 message->firstLine = "SETUP " + mediaURL + " RTSP/1.0";
54 message->headers["Transport"] = "RTP/AVP;unicast;client_port=" + std::to_string(client_port_a) + "-" + std::to_string(client_port_b);
55
56 addStandardHeaders(*message);
57 queueMessage(message);
58 return true;
59 }
60 return false;
61}
62
63bool Cogs::RTSP::sendPlay() {
64 if (has_play) {
65 Message* message = new Message();
66
67 message->firstLine = buildURL("PLAY");
68
69 addStandardHeaders(*message);
70 queueMessage(message);
71 return true;
72 }
73 return false;
74}
75
76bool Cogs::RTSP::sendPause() {
77 if (has_pause) {
78 Message* message = new Message();
79
80 message->firstLine = buildURL("PAUSE");
81
82 addStandardHeaders(*message);
83 queueMessage(message);
84 return true;
85 }
86 return false;
87}
88
89bool Cogs::RTSP::sendTeardown() {
90 if (has_teardown) {
91 Message* message = new Message();
92
93 message->firstLine = buildURL("TEARDOWN");
94
95 addStandardHeaders(*message);
96 queueMessage(message);
97 return true;
98 }
99 return false;
100}
101
102void Cogs::RTSP::parseSDP(const std::string& sdp) {
103 std::stringstream ss(sdp);
104 std::string str;
105 uint8_t ip[4] = {};
106
107 mediaDescriptions.clear();
108 has_video = false;
109 has_audio = false;
110
111 while (std::getline(ss, str, '\n')) {
112 switch (str[0]) {
113 case 'v': {
114 break;
115 }
116 case 'o': {
117 std::smatch m;
118 std::regex r("IN IP4 ([0-9]*).([0-9]*).([0-9]*).([0-9]*)");
119
120 if (std::regex_search(str, m, r)) {
121 ip[0] = static_cast<uint8_t>(std::atoi(m[1].str().c_str()));
122 ip[1] = static_cast<uint8_t>(std::atoi(m[2].str().c_str()));
123 ip[2] = static_cast<uint8_t>(std::atoi(m[3].str().c_str()));
124 ip[3] = static_cast<uint8_t>(std::atoi(m[4].str().c_str()));
125 }
126 break;
127 }
128 case 's':
129 case 'i':
130 case 'c':
131 case 'b':
132 case 't': {
133 break;
134 }
135 case 'm': {
136 MediaDescription desc = {};
137 std::smatch m;
138
139 desc.media_name = str.substr(2);
140
141 if (std::regex_search(str, m, std::regex("([a-z]*) ([0-9]*) RTP/AVP ([0-9]*)"))) {
142 desc.media_type = m[1].str();
143
144 if (desc.media_type == "video") {
145 has_video = true;
146 }
147 else if (desc.media_type == "audio") {
148 has_audio = true;
149 }
150 desc.rtp_payload_type = std::atoi(m[3].str().c_str());
151 }
152 desc.ip[0] = ip[0];
153 desc.ip[1] = ip[1];
154 desc.ip[2] = ip[2];
155 desc.ip[3] = ip[3];
156 mediaDescriptions.push_back(desc);
157 break;
158 }
159 case 'a': {
160 if (!mediaDescriptions.empty()) {
161 MediaDescription& desc = mediaDescriptions.back();
162
163 if (str.find("rtpmap") == 2) {
164 std::smatch m;
165
166 if (std::regex_search(str, m, std::regex("rtpmap:([0-9]*) ([A-Za-z0-9-]*)/([0-9]*)/?([A-Za-z0-9-]*)"))) {
167 uint32_t type = std::atoi(m[1].str().c_str());
168
169 assert(type == desc.rtp_payload_type);
170
171 desc.encoding_name = m[2].str();
172 desc.rtp_clock_rate = std::atoi(m[3].str().c_str());
173 }
174 else {
175 assert(false);
176 }
177 }
178 else if (str.find("control") == 2) {
179 std::smatch m;
180
181 if (std::regex_search(str, m, std::regex("control:([0-9a-zA-Z=:/\\.\\-\\?]*)"))) {
182 desc.control_id = m[1].str();
183 }
184 }
185 else if (str.find("fmtp") == 2) {
186 std::smatch m;
187
188 if (std::regex_search(str, m, std::regex("fmtp:([0-9]*)"))) {
189 uint32_t type = std::atoi(m[1].str().c_str());
190 assert(type == desc.rtp_payload_type);
191 }
192 if (std::regex_search(str, m, std::regex("profile-level-id=([A-fa-f0-9]*)"))) {
193 desc.profile_level_id = std::stol(m[1].str().c_str(), 0, 16);
194 }
195 if (std::regex_search(str, m, std::regex("packetization-mode=([0-9]*)"))) {
196 desc.packetization_mode = std::atoi(m[1].str().c_str());
197 }
198 if (std::regex_search(str, m, std::regex("sprop-parameter-sets=([A-Za-z0-9+/]*)[=]*,[=]*([A-Za-z0-9+/]*)[=]*"))) {
199 desc.sps = base64_decode(m[1].str());
200 desc.pps = base64_decode(m[2].str());
201 }
202 if (std::regex_search(str, m, std::regex("sprop-vps=([A-Za-z0-9+/]*)[=]*"))) {
203 desc.vps = base64_decode(m[1].str());
204 }
205 if (std::regex_search(str, m, std::regex("sprop-sps=([A-Za-z0-9+/]*)[=]*"))) {
206 desc.sps = base64_decode(m[1].str());
207 }
208 if(std::regex_search(str, m, std::regex("sprop-pps=([A-Za-z0-9+/]*)[=]*"))){
209 desc.pps = base64_decode(m[1].str());
210 }
211 if(std::regex_search(str, m, std::regex("sprop-max-don-diff"))){
212 assert(false); // TODO: Add support for this in RTP.cpp
213 }
214 // TODO tx-mode=SRST;profile-id=1;level-id=186;tier-flag=0;profile-space=0;
215 }
216 else if(str.find("framerate") == 2){
217 // TODO: a=framerate:25.000000
218 }
219 else if(str.find("transform") == 2){
220 // TODO: a=transform:-1.000000,0.000000,0.000000;0.000000,-1.000000,0.000000;0.000000,0.000000,1.000000
221 }
222 }
223 break;
224 }
225 }
226 }
227}
228
229bool Cogs::RTSP::processOutgoing() {
230 if (isConnected()) {
231 if (!outgoingBuffer.unreadSize()) {
232 Cogs::LockGuard lock(queueMutex);
233
234 if (!queuedMessages.empty()) {
235 Message* message = queuedMessages.front();
236
237 queuedMessages.pop_front();
238
239 outgoingBuffer.clear();
240 outgoingBuffer.write(message->firstLine.data(), message->firstLine.size());
241 outgoingBuffer.write("\x0D\x0A", 2);
242 for (auto& [key,value] : message->headers) {
243 outgoingBuffer.write(key.data(), key.size());
244 outgoingBuffer.write(": ", 2);
245 outgoingBuffer.write(value.data(), value.size());
246 outgoingBuffer.write("\x0D\x0A", 2);
247 }
248 outgoingBuffer.write("\x0D\x0A", 2);
249 outgoingBuffer.write(message->data(), message->size());
250 outgoingBuffer.seek(0, Memory::MemoryBuffer::Anchor::Start);
251
252 sentMessages.push_back(message);
253 }
254 }
255 if (outgoingBuffer.unreadSize()) {
256 uint64_t bytesSent = 0;
257
258 if (send(outgoingBuffer.dataFromPos(), outgoingBuffer.unreadSize(), bytesSent)) {
259 outgoingBuffer.seek(bytesSent, Memory::MemoryBuffer::Anchor::Current);
260 }
261 }
262 if (keepAliveTime) {
263 int64_t now = Timer::currentTimeMilliseconds();
264
265 if (keepAliveTime < now) {
266 Message* message = new Message();
267
268 if (has_set_parameter) {
269 message->firstLine = buildURL("SET_PARAMETER");
270 }
271 else if (has_get_parameter) {
272 message->firstLine = buildURL("GET_PARAMETER");
273 }
274 else {
275 message->firstLine = buildURL("OPTIONS");
276 }
277
278 addStandardHeaders(*message);
279 queueMessage(message);
280 keepAliveTime = now + timeout - 1000;
281 return true;
282 }
283 }
284 }
285 return false;
286}
287
289 uint64_t remainingSpace = incomingBuffer.unreadSize() - 1;
290 uint64_t bytesReceived = incomingBuffer.unreadSize();
291 char* bufferStart = static_cast<char*>( incomingBuffer.data() );
292
293 if (receive(bufferStart + incomingBuffer.pos(), remainingSpace, bytesReceived)) {
294 incomingBuffer.seek(bytesReceived, Cogs::Memory::MemoryBuffer::Anchor::Current);
295 *static_cast<char*>(incomingBuffer.dataFromPos()) = 0;
296 }
297 if (!incomingMessage.finishedHeaders) {
298 char* lineStart = bufferStart;
299
300 for( ;; ) {
301 char* lineEnd = std::strstr(lineStart, "\x0D\x0A");
302
303 if( lineEnd ) {
304 *lineEnd = 0;
305
306 if (lineStart == lineEnd) {
307 incomingMessage.finishedHeaders = true;
308 if (!incomingMessage.size()) {
309 incomingMessage.finishedBody = true;
310 }
311 lineStart = lineEnd + 2;
312 break;
313 }
314 else {
315 if (incomingMessage.firstLine.empty()) {
316 incomingMessage.firstLine = lineStart;
317 }
318 else {
319 char* colon = strchr(lineStart, ':');
320 if (colon) {
321 *colon++ = 0;
322 incomingMessage.headers[lineStart] = ++colon;
323
324 if (!strcmp(lineStart, "Content-Length")) {
325 incomingMessage.resize(std::atoi(colon));
326 }
327 }
328 }
329 }
330 lineStart = lineEnd + 2;
331 }
332 else {
333 break;
334 }
335 }
336 uint64_t remainingData = (static_cast<char*>(incomingBuffer.data()) + incomingBuffer.size()) - lineStart;
337
338 memmove(incomingBuffer.data(), lineStart, remainingData);
339 incomingBuffer.seek(static_cast<char*>(incomingBuffer.data()) - lineStart, Cogs::Memory::MemoryBuffer::Anchor::Current);
340 }
341 if (!incomingMessage.finishedBody) {
342 memcpy(incomingMessage.dataFromPos(), incomingBuffer.data(), incomingBuffer.pos());
343 incomingMessage.seek(incomingBuffer.pos(), Cogs::Memory::MemoryBuffer::Anchor::Current);
344 incomingBuffer.seek(0, Cogs::Memory::MemoryBuffer::Anchor::Start);
345
346 if (incomingMessage.pos() == incomingMessage.size()) {
347 incomingMessage.finishedBody = true;
348 }
349 }
350 if (incomingMessage.finishedBody) {
351 incomingMessage.seek(0, Cogs::Memory::MemoryBuffer::Anchor::Start);
352 processIncomingMessage(incomingMessage);
353
354 incomingMessage.reset();
355 }
356 return true;
357}
358
359void Cogs::RTSP::processIncomingMessage(Message& message) {
360 std::string seq = message.headers["CSeq"];
361
362 // We don't support these (optional) requests from the server...
363 if (message.firstLine.starts_with("OPTIONS") ||
364 message.firstLine.starts_with("GET_PARAMETER" )) {
365 Message* response = new Message();
366
367 // Do NOT add standard headers here, because we need the original sequence number.
368 response->firstLine = "RTSP/1.0 501 Not Implemented";
369 response->headers["CSeq"] = seq;
370 response->headers["User-Agent"] = agentString;
371 queueMessage(response);
372 return;
373 }
374
375 LOG_INFO(logger, "Received: %s", message.firstLine.c_str());
376
377 Message* request = nullptr;
378
379 // Find the request we sent that matches the sequence number of the incoming message.
380 for (auto i = sentMessages.begin(), e = sentMessages.end(); i != e; ++i) {
381 if ((*i)->headers["CSeq"] == seq) {
382 request = *i;
383 sentMessages.erase(i);
384 break;
385 }
386 }
387 if (request) {
388 if (message.firstLine.starts_with("RTSP/1.0")) {
389 switch (std::atoi(message.firstLine.c_str() + 9)) {
390 case 301: // Moved Permanently
391 case 302: // Found
392 case 305: { // Use Proxy
393 breakdownURL(message.headers["Location"]);
394
395 request->firstLine = buildURL(request->firstLine.substr(0, request->firstLine.find(' ')));
396 request->headers["CSeq"] = std::to_string(sequenceNumber++);
397 queueMessage(request);
398 return;
399 }
400 case 401: { // Authorisation Needed.
401 // The original request should be retried with a new sequence number and the username/password
402 // encoded per the server's requirements as an "Authorization" header.
403 assert(false);
404 break;
405 }
406 }
407 }
408
409 if (request->firstLine.starts_with("OPTIONS")) {
410 auto i = message.headers.find("Public");
411
412 if (i != message.headers.end()) {
413 has_describe = i->second.find("DESCRIBE") != std::string::npos;
414 has_setup = i->second.find("SETUP") != std::string::npos;
415 has_play = i->second.find("PLAY") != std::string::npos;
416 has_pause = i->second.find("PAUSE") != std::string::npos;
417 has_teardown = i->second.find("TEARDOWN") != std::string::npos;
418 has_set_parameter = i->second.find("SET_PARAMETER") != std::string::npos;
419 has_get_parameter = i->second.find("GET_PARAMETER") != std::string::npos;
420 }
421 }
422 else if (request->firstLine.starts_with("DESCRIBE")) {
423 parseSDP(std::string(static_cast<char*>(message.data()), message.size()));
424 receivedMediaDescriptions();
425 }
426 else if (request->firstLine.starts_with("SETUP")) {
427 sessionID = message.headers["Session"];
428
429 MediaDescription* md = static_cast<MediaDescription*>(request->userData);
430 size_t semicolon = sessionID.find(";timeout=");
431 std::string& s = message.headers["Transport"];
432 std::smatch m;
433
434 if (semicolon != std::string::npos) {
435 timeout = std::atoi(sessionID.c_str() + semicolon + 9) * 1000;
436 sessionID.erase(semicolon);
437 }
438 keepAliveTime = Timer::currentTimeMilliseconds() + timeout - 1000;
439
440 if (std::regex_search(s, m, std::regex("server_port=([0-9]+)-([0-9]+)"))) {
441 md->server_port[0] = static_cast<uint16_t>(std::stoi(m[1].str()));
442 md->server_port[1] = static_cast<uint16_t>(std::stoi(m[2].str()));
443 }
444 if (std::regex_search(s, m, std::regex("client_port=([0-9]+)-([0-9]+)"))) {
445 md->client_port[0] = static_cast<uint16_t>(std::stoi(m[1].str()));
446 md->client_port[1] = static_cast<uint16_t>(std::stoi(m[2].str()));
447 }
448 if (std::regex_search(s, m, std::regex("ssrc=([A-Fa-f0-9]+)"))) {
449 md->ssrc = std::stoul(m[1].str(), 0, 16);
450 }
451 receivedStreamSetup(*md);
452 }
453 else if (request->firstLine.starts_with("TEARDOWN")) {
454 sessionID.clear();
455 timeout = 60000;
456 keepAliveTime = 0;
457 }
458 delete request;
459 }
460}
461
462void Cogs::RTSP::queueMessage(Message* message) {
463 if (message) {
464 Cogs::LockGuard lock(queueMutex);
465
466 LOG_INFO(logger, "Sending %s.", message->firstLine.c_str());
467 queuedMessages.push_back(message);
468 }
469}
470
471void Cogs::RTSP::addStandardHeaders(Message& message) {
472 message.headers["CSeq"] = std::to_string(sequenceNumber++);
473 message.headers["User-Agent"] = agentString;
474
475 if (!sessionID.empty()) {
476 message.headers["Session"] = sessionID;
477 }
478}
479
480bool Cogs::RTSP::breakdownURL(std::string_view url) {
481 std::string oldHostname = hostname;
482 uint16_t oldPort = hostPort;
483 size_t start = (url.starts_with("rtsp://")) ? 7 : 0;
484 size_t end = url.find('/', start);
485
486 hostname = std::string(url.substr(start, end - start));
487 if (end != std::string::npos) {
488 resourcePath = std::string(url.substr(end + 1));
489 }
490
491 end = hostname.find('@');
492 if (end != std::string::npos) {
493 userInfo = std::string(hostname.substr(0, end++));
494 hostname.erase(0, end);
495 }
496 start = hostname.find(':');
497 if (start != std::string::npos) {
498 hostPort = static_cast<uint16_t>(std::atoi(hostname.c_str() + start + 1));
499 hostname.erase(start);
500 }
501 else {
502 hostPort = 554;
503 }
504 if ((hostname != oldHostname) || (hostPort != oldPort)) {
505 sockAddr = Network::SockaddrIn(Network::AddrIn(hostname), hostPort);
506 return true;
507 }
508 return false;
509}
510
511std::string Cogs::RTSP::buildURL(std::string_view method) {
512 std::string url;
513
514 url.reserve(400);
515 url = method;
516 url += " rtsp://";
517
518 if (!userInfo.empty()) {
519 url += userInfo;
520 url += "@";
521 }
522 url += hostname;
523 url += "/";
524
525 if (!resourcePath.empty()) {
526 url += resourcePath;
527 }
528 url += " RTSP/1.0";
529
530 return url;
531}
532
533void Cogs::RTSP::sendOptions() {
534 Message* message = new Message();
535
536 message->firstLine = buildURL("OPTIONS");
537
538 addStandardHeaders(*message);
539 queueMessage(message);
540}
541
542void Cogs::RTSP::sendDescribe() {
543 Message* message = new Message();
544
545 message->firstLine = buildURL("DESCRIBE");
546 message->headers["Accept"] = "application/sdp";
547
548 addStandardHeaders(*message);
549 queueMessage(message);
550}
Log implementation class.
Definition: LogManager.h:140
virtual bool processIncoming() override
Manages accepting incoming connections when this connection is a listener.
Definition: RTSP.cpp:288
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:181