Cogs.Core
RTP.cpp
1#include "RTP.h"
2
3#include "Context.h"
4#include "Resources/TextureManager.h"
5#include "Renderer/RenderTexture.h"
6
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/Timer.h"
9
10#undef CurrentTime
11#include "jrtp/rtpsession.h"
12#include "jrtp/rtpsessionparams.h"
13#include "jrtp/rtpudpv4transmitter.h"
14#include "jrtp/rtptcptransmitter.h"
15#include "jrtp/rtppacket.h"
16#include "jrtp/rtpsourcedata.h"
17#include "jrtp/rtptcpaddress.h"
18
19#include <assert.h>
20#include <cinttypes>
21
22#define INITIAL_BUFFER 64*3*5*7
23
24using namespace Cogs::Core;
25using namespace Cogs::Network;
26
27namespace {
28 constexpr const uint8_t naluStartCode[] = { 0x00, 0x00, 0x00, 0x01 };
29
30 class CogsRTPMemoryManager : public jrtplib::RTPMemoryManager {
31 public:
32 static CogsRTPMemoryManager* instance() {
33 static CogsRTPMemoryManager i;
34
35 return &i;
36 }
37
38 void* AllocateBuffer(size_t numbytes, int /*memtype*/) {
39 void* mem = malloc(numbytes);
40
41 if (mem) {
42 OsoMP_Allocate(COGSMEM_LibRTP, mem, numbytes, nullptr, 0);
43 }
44 return mem;
45 }
46
47 void FreeBuffer(void* ptr) {
48 if (ptr) {
49 OsoMP_Free(COGSMEM_LibRTP, ptr, nullptr, 0);
50 }
51 free(ptr);
52 }
53 };
54
55int64_t timestamp_diff(uint32_t t1, uint32_t t0)
56{
57 // Computing timestamp difference:
58 // We have timestamp a=t0 and b=t1 and timeline (4 bit example):
59 // ---- ---- ---- ----
60
61 // Normal Difference:
62 // ---- -ab- ---- ----
63 // diff = b-a = 6-5 = 1
64
65 // Normal Negative Difference:
66 // ---- -ba- ---- ----
67 // diff = b-a = 6-5 = -1
68
69 // Wraparound:
70 // b--- ---- ---- ---a
71 // diff = (b-a)+2^bits = (0-15)+16 = 1
72
73 // Negative Wraparound:
74 // a--- ---- ---- ---b
75 // diff = (b-a)-2^bits = (15-0)-16 = -1
76
77 // Deciding wraparound
78 // abs(diff) >= 2^(bits-1) -> abs(diff) >= 2^3 = 8
79
80 // int64_t diff = (int64_t)t1 - (int64_t)t0;
81 // if(llabs(diff) > 0x7fffffffLL){
82 // if(diff > 0)
83 // diff -= (0xffffffffLL+1LL);
84 // else
85 // diff += (0xffffffffLL+1LL);
86 // }
87
88 int32_t diff = (int64_t)t1 - (int64_t)t0;
89 return diff;
90}
91void timestamp_diff_test()
92{
93 // Normal
94 assert(timestamp_diff(0, 0) == 0);
95 assert(timestamp_diff(0xffffffffu, 0xffffffffu) == 0);
96 assert(timestamp_diff(1, 1) == 0);
97 assert(timestamp_diff(2, 1) == 1);
98 assert(timestamp_diff(1, 2) == -1);
99
100 // Wraparound
101 assert(timestamp_diff(0, 0xffffffffu) == 1);
102 assert(timestamp_diff(1, 0xffffffffu) == 2);
103
104 // Negative Wraparound
105 assert(timestamp_diff(0xffffffffu, 0) == -1);
106 assert(timestamp_diff(0xffffffffu, 1) == -2);
107}
108} // namespace ...
109
110static Cogs::Logging::Log logger = Cogs::Logging::getLogger("RTP");
111
112Cogs::RTPStream::RTPStream(Cogs::Core::Context * context, uint32_t clock_rate, uint16_t port, Cogs::Network::AddrIn mc_adapter, bool udp):
113 context(context),
114 useUDP(udp),
115 clock_rate(clock_rate),
116 client_port(port)
117{
118 timestamp_diff_test();
119 LOG_INFO(logger, "jrtplib Version: %s", jrtplib::RTPLibraryVersion::GetVersion().GetVersionString().c_str());
120
121 tcpConnection.setDoNotProcess();
122
123 session = new jrtplib::RTPSession(nullptr, CogsRTPMemoryManager::instance());
124
125 jrtplib::RTPSessionParams sessionparams;
126 sessionparams.SetOwnTimestampUnit(1.0/clock_rate); // Local timestamp unit (MUST BE SET)
127 // sessionparams.SetAcceptOwnPackets(true);
128 assert(!sessionparams.IsUsingPollThread());
129
130 if (useUDP) {
131 jrtplib::RTPUDPv4TransmissionParams transparams;
132
133 transparams.SetPortbase(port);
134 transparams.SetRTPReceiveBuffer(1024*1024);
135 transparams.SetRTCPReceiveBuffer(1500*64);
136 if (!(mc_adapter[0] == 0 && mc_adapter[1] == 0 && mc_adapter[2] == 0 && mc_adapter[3] == 0)){
137 LOG_INFO(logger, "Multicast adapter: %s", mc_adapter.string().c_str());
138 uint8_t ip_arr[4] = {mc_adapter[0], mc_adapter[1], mc_adapter[2], mc_adapter[3]};
139 jrtplib::RTPIPv4Address rtp_ip;
140 rtp_ip.SetIP(ip_arr);
141 transparams.SetMulticastInterfaceIP(rtp_ip.GetIP());
142 }
143
144 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
145 if (status < 0) {
146 LOG_ERROR(logger, "session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
147 return;
148 }
149 jrtplib::RTPUDPv4TransmissionInfo *info = reinterpret_cast<jrtplib::RTPUDPv4TransmissionInfo*>(session->GetTransmissionInfo());
150 rtp_port = info->GetRTPPort();
151 rtcp_port = info->GetRTCPPort();
152 }
153 else {
154 jrtplib::RTPTCPTransmissionParams transparams;
155 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
156
157 if (status < 0) {
158 LOG_ERROR(logger, "session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
159 return;
160 }
161 rtp_port = port;
162 rtcp_port = port + 1;
163 }
164
165 // session.SetDefaultPayloadType(96);
166 // session.SetDefaultMark(false);
167 // session.SetDefaultTimestampIncrement(160);
168}
169
170Cogs::RTPStream::~RTPStream()
171{
172 LOG_INFO(logger, "Shutdown");
173 jrtplib::RTPTime delay(10.0);
174 session->BYEDestroy(delay, "BYE", 9);
175 for(auto &tmp : dest) {
176 if(tmp.second.fp) {
177 fclose(tmp.second.fp);
178 }
179 delete [] tmp.second.buffer;
180 }
181 delete session;
182}
183void Cogs::RTPStream::AddDestination(const uint8_t in_ip[4], uint16_t in_server_port)
184{
185 for (uint32_t i = 0; i < 4; i++) {
186 ip[i] = in_ip[i];
187 }
188 server_port = in_server_port;
189
190 LOG_INFO(logger, "Add destination %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
191
192 int status = 0;
193
194 if (useUDP) {
195 status = session->AddDestination(jrtplib::RTPIPv4Address(ip, server_port));
196 }
197 else {
199
200 if (tcpConnection.connect(addr)) {
201 LOG_INFO(logger, "RTPStream::AddDestination: Connected to %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
202
203 tcpConnection.enableAutoReconnect();
204 status = session->AddDestination(jrtplib::RTPTCPAddress(tcpConnection.getSocket()));
205 }
206 else {
207 LOG_ERROR(logger, "RTPStream::AddDestination failed to connect to : %d.%d.%d.%d:%d", ip[0], ip[1], ip[2], ip[3], server_port);
208 }
209 }
210 if(status < 0){
211 LOG_ERROR(logger, "RTPStream::AddDestination failed: %s", jrtplib::RTPGetErrorString(status).c_str());
212 }
213}
214void Cogs::RTPStream::JoinMulticastGroup(const uint8_t in_ip[4], uint16_t in_server_port)
215{
216 server_port = in_server_port;
217 for(uint32_t i=0; i<4; i++) ip[i] = in_ip[i];
218 LOG_INFO(logger, "Join multicast group %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
219 jrtplib::RTPIPv4Address addr(ip, server_port);
220 if(!session->SupportsMulticasting()){
221 LOG_ERROR(logger, "session->SupportsMulticasting failed. (Possibly duplicate connection)");
222 return;
223 }
224 int status = session->JoinMulticastGroup(addr);
225 if(status < 0){
226 LOG_ERROR(logger, "session->JoinMulticastGroup failed: %s",
227 jrtplib::RTPGetErrorString(status).c_str());
228 }
229}
230void Cogs::RTPStream::SendPacket(const uint8_t *buffer, size_t size)
231{
232 int status = session->SendPacket(buffer, size);
233 if(status < 0){
234 LOG_ERROR(logger, "session->SendPacket failed: %s",
235 jrtplib::RTPGetErrorString(status).c_str());
236 return;
237 }
238}
239static bool g_first = true;
240void Cogs::RTPStream::AddSSRC(uint32_t ssrc, Cogs::Core::Codec codec_in, ResourceId textureId)
241{
242 Dest d = {};
243 d.first = g_first;
244 g_first = false;
245 d.textureHandle = Cogs::Core::TextureHandle(context->textureManager->getHandle(textureId));
246 d.codec = codec_in;
247 // if(false){
248 // std::string file = "test_stream_" + std::to_string(stream_number++) + ".h264";
249 // LOG_INFO(logger, "Writing file %s", file.c_str());
250 // d.fp = fopen(file.c_str(), "wb");
251 // }
252 d.buffer_size = INITIAL_BUFFER;
253 d.buffer = new char[d.buffer_size];
254
255 LOG_INFO(logger, "Adding SSRC 0x%x to RTPStream %p", ssrc, this);
256
257 dest[ssrc] = std::move(d);
258}
259void Cogs::RTPStream::WriteData(Dest &dest_w)
260{
261 // Export to file...
262 if(dest_w.fp) {
263 fwrite(dest_w.nalu.data(), 1, dest_w.nalu.size(), dest_w.fp);
264 }
265
266 // Upload to decoder...
267 if(HandleIsValid(dest_w.textureHandle)){
268 IVideoDecoderContext* decoderContext = context->videoDecoderContext;
269
270 if (decoderContext) {
271 VideoDecoderPayload payload = {};
272
273 payload.size = dest_w.nalu.size();
274 payload.data = dest_w.nalu.data();
275 payload.timestamp = dest_w.timestamp;
276 payload.use_timestamp = true;
277
278 decoderContext->streamVideoData(dest_w.textureHandle, payload);
279 }
280 }
281 dest_w.nalu.clear();
282}
283Cogs::RTPStream::Dest *Cogs::RTPStream::GetLatestStream()
284{
285 uint64_t t = 0;
286 Dest *ret = nullptr;
287 for(auto &tmp : dest){
288 if(tmp.second.receive_time >= t){
289 t = tmp.second.receive_time;
290 ret = &tmp.second;
291 }
292 }
293 return ret;
294}
295void Cogs::RTPStream::Update()
296{
297 if(use_pool_thread) {
298 session->BeginDataAccess();
299 }
300 else {
301 session->Poll();
302 }
303
304 // Update TimestampUnit and handle BYE
305 if(session->GotoFirstSource()){
306 do{
307 jrtplib::RTPSourceData *source = session->GetCurrentSourceInfo();
308 source->SetTimestampUnit(1.0/clock_rate);
309 if(source->ReceivedBYE()){
310 uint32_t ssrc = source->GetSSRC();
311 auto iter = dest.find(ssrc);
312 if(iter != dest.end()){
313 size_t bye_len;
314 uint8_t *bye_reason = source->GetBYEReason(&bye_len);
315 jrtplib::RTPTime rtp_time = source->GetBYETime();
316 uint64_t bye_time = rtp_time.GetSeconds() * 1000000LL + rtp_time.GetMicroSeconds();
317 LOG_INFO(logger, "Removing SSRC 0x%x from RTPStream %p. BYE: %s (t=%" PRIu64 ")",
318 ssrc, this, bye_reason ? std::string((char*)bye_reason, bye_len).c_str() : "", bye_time);
319 if(iter->second.fp)
320 fclose(iter->second.fp);
321 delete [] iter->second.buffer;
322 dest.erase(iter);
323 }
324 }
325 } while(session->GotoNextSource());
326 }
327 else {
328 LOG_ERROR(logger, "GotoFirstSource() failed for RTPStream %p.", this);
329 }
330
331 // Handle Incoming Data
332 if(session->GotoFirstSourceWithData()){
333 std::unique_lock lock(dest_mu);
334 do {
335 jrtplib::RTPPacket *packet;
336
337 while ((packet = session->GetNextPacket()) != 0) {
338 uint32_t ssrc = packet->GetSSRC();
339 uint32_t eseq = packet->GetExtendedSequenceNumber();
340 uint8_t* data = packet->GetPayloadData();
341 size_t size = packet->GetPayloadLength();
342 jrtplib::RTPTime rtime = packet->GetReceiveTime();
343 uint64_t pkt_rtime = rtime.GetSeconds() * 1000000LL + rtime.GetMicroSeconds();
344 uint64_t pkt_time = 0xF000000000000000ULL;
345
346 if (packet->HasExtension()) {
347 uint16_t ext_id = packet->GetExtensionID();
348 size_t ext_size = packet->GetExtensionLength();
349 uint8_t *ext_data = packet->GetExtensionData();
350 // LOG_INFO(logger, "RTP Has extension id %u size %zu", ext_id, ext_size);
351
352 // One byte header extension rfc5285
353 if(rtp_one_byte_header_extension){
354 // LOG_INFO(logger, "RTP one byte header extension");
355 size_t idx = 0;
356 while(idx < ext_size) {
357 uint8_t ext_ext_header = ext_data[idx++];
358 if(ext_ext_header == 0) continue; // Padding byte
359 if(idx == ext_size) break;
360 uint16_t ext_ext_id = (ext_ext_header&0xf0)>>4;
361 size_t ext_ext_size = 1+ext_ext_header&0xf;
362 if(ext_ext_id == 15) break;
363 uint8_t *ext_ext_data = &ext_data[idx];
364 // Synchronisation metadata: 64-bit timestamp format
365 if(rfc6051_ext_id != 0 && ext_ext_id == rfc6051_ext_id){
366 uint64_t pkt_time_ext = 0;
367 uint64_t ntp =
368 ((uint64_t)ext_ext_data[7]<<0) |
369 ((uint64_t)ext_ext_data[6]<<8) |
370 ((uint64_t)ext_ext_data[5]<<16) |
371 ((uint64_t)ext_ext_data[4]<<24) |
372 ((uint64_t)ext_ext_data[3]<<32) |
373 ((uint64_t)ext_ext_data[2]<<40) |
374 ((uint64_t)ext_ext_data[1]<<48) |
375 ((uint64_t)ext_ext_data[0]<<56);
376 auto tconv = jrtplib::RTPTime(jrtplib::RTPNTPTime((ntp>>32)&0xffffffffu, ntp&0xffffffffu));
377 pkt_time_ext = tconv.GetSeconds() * 1000000LL + tconv.GetMicroSeconds();
378 static uint32_t testvar = 0;
379 if(testvar < 20){
380 LOG_INFO(logger, "NTP ts %" PRIu64 " ", ntp);
381 LOG_INFO(logger, "Using RTP 64 bit timestamp extension. (ext ts %" PRIu64 " old ts %" PRIu64 ")",
382 pkt_time_ext, pkt_time);
383 testvar++;
384 }
385 pkt_time = pkt_time_ext;
386 }
387 else{
388 LOG_WARNING(logger, "RTP has unused OBH extension id %u size %zu", ext_ext_id, ext_ext_size);
389 }
390 idx += ext_ext_size;
391 }
392 }
393
394 // Two byte header extension rfc5285
395 else if(rtp_two_byte_header_extension){
396 LOG_WARNING(logger, "RTP two byte header extension not supported."); // TODO add support for this?
397 }
398
399 // Unused extension
400 else{
401 LOG_WARNING(logger, "RTP has unused extension id %u size %zu.", ext_id, ext_size);
402 }
403 }
404 if (pkt_time == 0xF000000000000000ULL) {
405 jrtplib::RTPSourceData* source_data = session->GetCurrentSourceInfo();
406
407 if (source_data->SR_HasInfo()) {
408 uint32_t ref_ts = source_data->SR_GetRTPTimestamp();
409 double ts_unit = source_data->GetTimestampUnit();
410 uint32_t pkt_ts = packet->GetTimestamp();
411 int64_t ts_diff = timestamp_diff(pkt_ts, ref_ts);
412 int64_t ts_diff_us = (int64_t)((double)ts_diff * (ts_unit * 1000000.0));
413
414 // Flush packets if older then 5 seconds from senders report
415 if(ts_diff_us >= -1000000.0*5.0){
416 jrtplib::RTPTime ref_ntp_rtp = jrtplib::RTPTime(source_data->SR_GetNTPTimestamp());
417 uint64_t ref_time = ref_ntp_rtp.GetSeconds() * 1000000LL + ref_ntp_rtp.GetMicroSeconds();
418
419 pkt_time = ref_time + ts_diff_us;
420 }
421 else {
422 source_data->FlushPackets();
423 LOG_INFO(logger,"RTPStream %p: Flushed Packets SSRC 0x%x"
424 "(pkg age %" PRId64 " us pkt_ts %d ref_ts %d ts_unit %f)",
425 this, ssrc, ts_diff_us, pkt_ts, ref_ts, ts_unit);
426 }
427 }
428 else {
429 //LOG_ERROR(logger, "RTPStream %p has not received any SenderInfo.", this);
430 // No sender report, just use packet timestamp
431 pkt_time = packet->GetTimestamp();
432 }
433 }
434
435 if (pkt_time != 0xF000000000000000ULL) {
436 auto iter = dest.find(ssrc);
437 if(iter == dest.end()){
438 LOG_INFO(logger, "Adding SSRC 0x%x to RTPStream %p", ssrc, this);
439 Dest d = {};
440 d.first = g_first;
441 g_first = false;
442 d.textureHandle = Cogs::Core::TextureHandle::NoHandle;
443 d.codec = codec;
444 // if(false){
445 // std::string file = "test_stream_" + std::to_string(stream_number++) + ".h264";
446 // LOG_INFO(logger, "Writing file %s", file.c_str());
447 // d.fp = fopen(file.c_str(), "wb");
448 // }
449 d.buffer_size = INITIAL_BUFFER;
450 d.buffer = new char[d.buffer_size];
451 iter = dest.insert({ssrc, std::move(d)}).first;
452 }
453 Dest &d = iter->second;
454
455 if(d.first){
456 // uint32_t type = packet->GetPayloadType();
457 // printf("Got packet extended seq %d SSRC %x PayloadType %d MarkerBit %s Extension %s\n",
458 // eseq, ssrc, type, packet->HasMarker()?"true":"false", packet->HasExtension()?"true":"false");
459 }
460 if (d.pps.size() && pkt_time != d.timestamp) {
461 // int64_t ct = Cogs::Timer::currentTimeMicroseconds();
462 // int64_t diff = ct - pkt_time;
463 // int64_t rcv_diff = ct - pkt_rtime;
464 // printf("diff %f\tts_diff_us %f\tts_diff %lld\trcv_diff %f\n", diff/1000000.0, ts_diff_us/1000000.0, ts_diff, rcv_diff/1000000.0);
465 }
466
467 d.prev_receive_time = d.receive_time;
468 d.receive_time = pkt_rtime;
469 d.prev_timestamp = d.timestamp;
470 d.timestamp = pkt_time;
471 d.size = size;
472 d.prev_eseq = d.eseq;
473 d.eseq = eseq;
474
475 // for(size_t i=0; i<size && i<=1500; i++){
476 // printf("%2X ", data[i]);
477 // if(i%16 == 15) printf("\n");
478 // }
479
480 d.size_akk += (uint32_t)size;
481 if(d.receive_time-d.rate_time > 1ull<<27){
482 double rt_diff = (d.receive_time-d.rate_time)*(1.0/(1ull<<32));
483 double rate = (d.size_akk*8)/rt_diff;
484 double a = 1.0/8;
485 d.rate = (1.0-a)*d.rate + a*rate;
486 d.rate_time = d.receive_time;
487 d.size_akk = 0;
488 }
489
490
491 if (d.codec == Cogs::Core::Codec::None) {
492 }
493 else if (d.codec == Cogs::Core::Codec::HEVC) {
494 if((data[0] & 0x80) != 0){
495 LOG_ERROR(logger, "Corrupt RTP HEVC packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.", this);
496 session->DeletePacket(packet);
497 continue;
498 }
499 uint32_t type = (data[0]>>1)&0x3f;
500 // uint32_t layerId = (data[0]&0x1)<<8 | data[1]&0xf8;
501 uint32_t TID = data[1]&0x7;
502 if(TID == 0){
503 // A TID value of 0 is illegal to ensure that there is at least one bit in the NAL unit
504 // header equal to 1, so to enable independent considerations of start code emulations
505 // in the NAL unit header and in the NAL unit payload data.
506 LOG_ERROR(logger, "Corrupt RTP H265 packet (TID == 0).");
507 session->DeletePacket(packet);
508 continue;
509 }
510 // TODO DONL if(sprop-max-don-diff > 0)
511
512 if (type <= 40) {
513 processH265NALU(d, type, data, size);
514 }
515 else if (type == 48) { // Agregation Packets AP
516 LOG_ERROR(logger, "H265 aggregation packets are not supported.");
517 assert(false); // TODO
518 }
519 else if (type == 49) { // Fragment Unit FUs
520 uint32_t s = (data[2] >> 7) & 0x1; // Start of fragmented NAL unit
521 uint32_t e = (data[2] >> 6) & 0x1; // End of fragmented NAL unit
522 uint32_t fu_type = data[2] & 0x3f;
523
524 assert(fu_type >= 0 && fu_type <= 40);
525
526 // TODO DONL if(sprop-max-don-diff > 0)
527 if (s) {
528 d.nalu.write(naluStartCode, sizeof(naluStartCode));
529 d.nalu.write8(static_cast<uint8_t>(fu_type << 1));
530 d.nalu.write8(1);
531 }
532 size_t beg = 3; // TODO 5 DONL
533 size_t siz = size-beg;
534 d.nalu.write(&data[beg], siz);
535 if (e) {
536 WriteData(d);
537 }
538 }
539 else{
540 LOG_ERROR(logger, "Unsupported H265 packet of type %d encountered.", type);
541 //assert(false);
542 }
543 }
544 else if (d.codec == Cogs::Core::Codec::H264) {
545 if((data[0] & 0x80 ) != 0){
546 LOG_ERROR(logger, "Corrupt RTP H264 packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.", this);
547 session->DeletePacket(packet);
548 continue;
549 }
550 uint32_t nri = (data[0] >> 5) & 0x3f;
551 uint32_t rtp_type = data[0] & 0x1f;
552
553 if (rtp_type <= 23){ // Single NAL Unit Packet (NAL unit)
554 processH264NALU(d, rtp_type, data, size);
555 }
556 else if(rtp_type == 24) { // Single-time aggregation packet (STAP-A)
557 uint8_t* read = data + 1;
558
559 while (read < (data + size)) {
560 uint16_t naluSize = ntohs(*reinterpret_cast<uint16_t*>(read));
561 read += 2;
562
563 if ((read[0] & 0x80) != 0) {
564 LOG_ERROR(logger, "Corrupt RTP H264 packet: Non-zero 'f' field in STAP-A NALU. (RTPStream %p). Possible MPEG-TS stream not supported.", this);
565 }
566 else {
567 processH264NALU(d, read[0] & 0x1f, read, naluSize);
568 }
569 read += naluSize;
570 }
571 }
572 else if (rtp_type == 25) { // Single-time aggregation packet (STAP_B)
573 LOG_ERROR(logger, "STAP-B packets are not supported.");
574 assert(false);
575 }
576 else if ((rtp_type == 26) || (rtp_type == 27)) { // Multi-time aggregation packet (MTAP16, MTAP24)
577 LOG_ERROR(logger, "MTAP packets are not supported.");
578 assert(false);
579 }
580 else if ((rtp_type == 28) || (rtp_type == 29)) { // Fragmentation unit (FU-A, FU-B)
581 assert(rtp_type == 28);
582 uint32_t s = (data[1]>>7)&0x1; // Start of fragmented NAL unit
583 uint32_t e = (data[1]>>6)&0x1; // End of fragmented NAL unit
584 uint32_t r = (data[1]>>5)&0x1;
585 assert(r == 0);
586 uint32_t type = (data[1]>>0)&0x1f;
587 if(rtp_type == 29){
588 //uint16_t DON = (data[2]<<8) | (data[3]<<0);
589 }
590 if(type == 1){ // Coded slice of a non-IDR picture
591 if(s){
592 d.nalu.write(naluStartCode, sizeof(naluStartCode));
593 d.nalu.write8(static_cast<uint8_t>((nri<<5) | type));
594 }
595 size_t beg = (rtp_type == 29) ? 4 : 2;
596
597 d.nalu.write(&data[beg], size - beg);
598 if (e) {
599 WriteData(d);
600 }
601 }
602 else if(type == 5){ // Coded slice of an IDR picture
603 if(s){
604 d.nalu.write(naluStartCode, sizeof(naluStartCode));
605 d.nalu.write8(static_cast<uint8_t>((nri<<5) | type));
606 }
607 size_t beg = (rtp_type == 29) ? 4 : 2;
608
609 d.nalu.write(&data[beg], size - beg);
610 if (e) {
611 WriteData(d);
612 }
613 }
614 else if (type == 0) { // Unspecified type
615 if (s) {
616 d.nalu.write(naluStartCode, sizeof(naluStartCode));
617 d.nalu.write8(static_cast<uint8_t>((nri << 5) | type));
618 }
619 size_t beg = (rtp_type == 29) ? 4 : 2;
620
621 d.nalu.write(&data[beg], size - beg);
622 if (e) {
623 WriteData(d);
624 }
625 }
626 else{
627 assert(false);
628 }
629 }
630 else{
631 assert(false);
632 }
633 }
634 else{
635 assert(false); // Unsupported codec
636 }
637 }
638 session->DeletePacket(packet);
639 }
640 }
641 while(session->GotoNextSourceWithData());
642 }
643 if (use_pool_thread) {
644 session->EndDataAccess();
645 }
646}
647
648void Cogs::RTPStream::processH264NALU(Dest& dest, uint32_t rtpType, const void* data, size_t size) {
649 assert (rtpType <= 23);
650
651 if ((rtpType == 7) && !dest.sps.size()) {
652 dest.sps = std::string(static_cast<const char*>(data), size);
653 LOG_INFO(logger, "RTP received H264 SPS (RTPStream %p)", this);
654 }
655 else if ((rtpType == 8) && !dest.pps.size()){
656 dest.pps = std::string(static_cast<const char*>(data), size);
657 LOG_INFO(logger, "RTP received H264 PPS (RTPStream %p)", this);
658 }
659 dest.nalu.write(naluStartCode, sizeof(naluStartCode));
660 dest.nalu.write(data, size);
661 WriteData(dest);
662}
663
664void Cogs::RTPStream::processH265NALU(Dest& dest, uint32_t rtpType, const void* data, size_t size) {
665 assert (rtpType <= 40);
666
667 if ((rtpType == 32) && !dest.vps.size()) {
668 dest.vps = std::string(static_cast<const char*>(data), size);
669 LOG_INFO(logger, "RTP receive H265 VPS (RTPStream %p)", this);
670 }
671 else if((rtpType == 33) && !dest.sps.size()) {
672 dest.sps = std::string(static_cast<const char*>(data), size);
673 LOG_INFO(logger, "RTP receive H265 SPS (RTPStream %p)", this);
674 }
675 else if((rtpType == 34) && !dest.pps.size()) {
676 dest.pps = std::string(static_cast<const char*>(data), size);
677 LOG_INFO(logger, "RTP receive H265 PPS (RTPStream %p)", this);
678 }
679 dest.nalu.write(naluStartCode, sizeof(naluStartCode));
680 dest.nalu.write(data, size);
681 WriteData(dest);
682}
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
Log implementation class.
Definition: LogManager.h:139
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
bool HandleIsValid(const ResourceHandle_t< T > &handle)
Check if the given resource is valid, that is not equal to NoHandle or InvalidHandle.
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180
static const ResourceHandle_t NoHandle
Handle representing a default (or none if default not present) resource.