4#include "Resources/TextureManager.h"
5#include "Renderer/RenderTexture.h"
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/Timer.h"
11#include "jrtp/rtpsession.h"
12#include "jrtp/rtpsessionparams.h"
13#include "jrtp/rtpudpv4transmitter.h"
14#include "jrtp/rtptcptransmitter.h"
15#include "jrtp/rtppacket.h"
16#include "jrtp/rtpsourcedata.h"
17#include "jrtp/rtptcpaddress.h"
22#define INITIAL_BUFFER 64*3*5*7
25using namespace Cogs::Network;
28 constexpr const uint8_t naluStartCode[] = { 0x00, 0x00, 0x00, 0x01 };
30 class CogsRTPMemoryManager :
public jrtplib::RTPMemoryManager {
32 static CogsRTPMemoryManager* instance() {
33 static CogsRTPMemoryManager i;
38 void* AllocateBuffer(
size_t numbytes,
int ) {
39 void* mem = malloc(numbytes);
42 OsoMP_Allocate(COGSMEM_LibRTP, mem, numbytes,
nullptr, 0);
47 void FreeBuffer(
void* ptr) {
49 OsoMP_Free(COGSMEM_LibRTP, ptr,
nullptr, 0);
55int64_t timestamp_diff(uint32_t t1, uint32_t t0)
88 int32_t diff = (int64_t)t1 - (int64_t)t0;
92void timestamp_diff_test()
95 assert(timestamp_diff(0, 0) == 0);
96 assert(timestamp_diff(0xffffffffu, 0xffffffffu) == 0);
97 assert(timestamp_diff(1, 1) == 0);
98 assert(timestamp_diff(2, 1) == 1);
99 assert(timestamp_diff(1, 2) == -1);
102 assert(timestamp_diff(0, 0xffffffffu) == 1);
103 assert(timestamp_diff(1, 0xffffffffu) == 2);
106 assert(timestamp_diff(0xffffffffu, 0) == -1);
107 assert(timestamp_diff(0xffffffffu, 1) == -2);
116 clock_rate(clock_rate),
119 timestamp_diff_test();
120 LOG_INFO(logger,
"jrtplib Version: %s", jrtplib::RTPLibraryVersion::GetVersion().GetVersionString().c_str());
122 tcpConnection.setDoNotProcess();
124 session =
new jrtplib::RTPSession(
nullptr, CogsRTPMemoryManager::instance());
126 jrtplib::RTPSessionParams sessionparams;
127 sessionparams.SetOwnTimestampUnit(1.0/clock_rate);
129 assert(!sessionparams.IsUsingPollThread());
132 jrtplib::RTPUDPv4TransmissionParams transparams;
134 transparams.SetPortbase(port);
135 transparams.SetRTPReceiveBuffer(1024*1024);
136 transparams.SetRTCPReceiveBuffer(1500*64);
137 if (!(mc_adapter[0] == 0 && mc_adapter[1] == 0 && mc_adapter[2] == 0 && mc_adapter[3] == 0)){
138 LOG_INFO(logger,
"Multicast adapter: %s", mc_adapter.string().c_str());
139 uint8_t ip_arr[4] = {mc_adapter[0], mc_adapter[1], mc_adapter[2], mc_adapter[3]};
140 jrtplib::RTPIPv4Address rtp_ip;
141 rtp_ip.SetIP(ip_arr);
142 transparams.SetMulticastInterfaceIP(rtp_ip.GetIP());
145 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
147 LOG_ERROR(logger,
"session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
150 jrtplib::RTPUDPv4TransmissionInfo *info =
reinterpret_cast<jrtplib::RTPUDPv4TransmissionInfo*
>(session->GetTransmissionInfo());
151 rtp_port = info->GetRTPPort();
152 rtcp_port = info->GetRTCPPort();
155 jrtplib::RTPTCPTransmissionParams transparams;
156 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
159 LOG_ERROR(logger,
"session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
163 rtcp_port = port + 1;
171Cogs::RTPStream::~RTPStream()
173 LOG_INFO(logger,
"Shutdown");
174 jrtplib::RTPTime delay(10.0);
175 session->BYEDestroy(delay,
"BYE", 9);
176 for(
auto &tmp : dest) {
178 fclose(tmp.second.fp);
180 delete [] tmp.second.buffer;
185void Cogs::RTPStream::AddDestination(
const uint8_t in_ip[4], uint16_t in_server_port)
187 for (uint32_t i = 0; i < 4; i++) {
190 server_port = in_server_port;
192 LOG_INFO(logger,
"Add destination %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
197 status = session->AddDestination(jrtplib::RTPIPv4Address(ip, server_port));
202 if (tcpConnection.connect(addr)) {
203 LOG_INFO(logger,
"RTPStream::AddDestination: Connected to %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
205 tcpConnection.enableAutoReconnect();
206 status = session->AddDestination(jrtplib::RTPTCPAddress(tcpConnection.getSocket()));
209 LOG_ERROR(logger,
"RTPStream::AddDestination failed to connect to : %d.%d.%d.%d:%d", ip[0], ip[1], ip[2], ip[3], server_port);
213 LOG_ERROR(logger,
"RTPStream::AddDestination failed: %s", jrtplib::RTPGetErrorString(status).c_str());
217void Cogs::RTPStream::JoinMulticastGroup(
const uint8_t in_ip[4], uint16_t in_server_port)
219 server_port = in_server_port;
220 for(uint32_t i=0; i<4; i++) ip[i] = in_ip[i];
221 LOG_INFO(logger,
"Join multicast group %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
222 jrtplib::RTPIPv4Address addr(ip, server_port);
223 if(!session->SupportsMulticasting()){
224 LOG_ERROR(logger,
"session->SupportsMulticasting failed. (Possibly duplicate connection)");
227 int status = session->JoinMulticastGroup(addr);
229 LOG_ERROR(logger,
"session->JoinMulticastGroup failed: %s",
230 jrtplib::RTPGetErrorString(status).c_str());
234void Cogs::RTPStream::SendPacket(
const uint8_t *buffer,
size_t size)
236 int status = session->SendPacket(buffer, size);
238 LOG_ERROR(logger,
"session->SendPacket failed: %s",
239 jrtplib::RTPGetErrorString(status).c_str());
244static bool g_first =
true;
246void Cogs::RTPStream::AddSSRC(uint32_t ssrc, Cogs::Core::Codec codec_in, ResourceId textureId)
258 d.buffer_size = INITIAL_BUFFER;
259 d.buffer =
new char[d.buffer_size];
261 LOG_INFO(logger,
"Adding SSRC 0x%x to RTPStream %p", ssrc,
this);
263 dest[ssrc] = std::move(d);
266void Cogs::RTPStream::WriteData(Dest &dest_w)
270 fwrite(dest_w.nalu.data(), 1, dest_w.nalu.size(), dest_w.fp);
277 if (decoderContext) {
280 payload.size = dest_w.nalu.size();
281 payload.data = dest_w.nalu.data();
282 payload.timestamp = dest_w.timestamp;
283 payload.use_timestamp =
true;
285 decoderContext->streamVideoData(dest_w.textureHandle, payload);
295 for(
auto &tmp : dest){
296 if(tmp.second.receive_time >= t){
297 t = tmp.second.receive_time;
304void Cogs::RTPStream::Update()
306 if(use_pool_thread) {
307 session->BeginDataAccess();
314 if(session->GotoFirstSource()){
316 jrtplib::RTPSourceData *source = session->GetCurrentSourceInfo();
317 source->SetTimestampUnit(1.0/clock_rate);
318 if(source->ReceivedBYE()){
319 uint32_t ssrc = source->GetSSRC();
320 auto iter = dest.find(ssrc);
321 if(iter != dest.end()){
323 uint8_t *bye_reason = source->GetBYEReason(&bye_len);
324 jrtplib::RTPTime rtp_time = source->GetBYETime();
325 uint64_t bye_time = rtp_time.GetSeconds() * 1000000LL + rtp_time.GetMicroSeconds();
326 LOG_INFO(logger,
"Removing SSRC 0x%x from RTPStream %p. BYE: %s (t=%" PRIu64
")",
327 ssrc,
this, bye_reason ? std::string((
char*)bye_reason, bye_len).c_str() :
"", bye_time);
329 fclose(iter->second.fp);
330 delete [] iter->second.buffer;
334 }
while(session->GotoNextSource());
337 LOG_ERROR(logger,
"GotoFirstSource() failed for RTPStream %p.",
this);
341 if(session->GotoFirstSourceWithData()){
342 std::unique_lock lock(dest_mu);
344 jrtplib::RTPPacket *packet;
346 while ((packet = session->GetNextPacket()) != 0) {
347 uint32_t ssrc = packet->GetSSRC();
348 uint32_t eseq = packet->GetExtendedSequenceNumber();
349 uint8_t* data = packet->GetPayloadData();
350 size_t size = packet->GetPayloadLength();
351 jrtplib::RTPTime rtime = packet->GetReceiveTime();
352 uint64_t pkt_rtime = rtime.GetSeconds() * 1000000LL + rtime.GetMicroSeconds();
353 uint64_t pkt_time = 0xF000000000000000ULL;
355 if (packet->HasExtension()) {
356 uint16_t ext_id = packet->GetExtensionID();
357 size_t ext_size = packet->GetExtensionLength();
358 uint8_t *ext_data = packet->GetExtensionData();
362 if(rtp_one_byte_header_extension){
365 while(idx < ext_size) {
366 uint8_t ext_ext_header = ext_data[idx++];
367 if(ext_ext_header == 0)
continue;
368 if(idx == ext_size)
break;
369 uint16_t ext_ext_id = (ext_ext_header&0xf0)>>4;
370 size_t ext_ext_size = 1+ext_ext_header&0xf;
371 if(ext_ext_id == 15)
break;
372 uint8_t *ext_ext_data = &ext_data[idx];
374 if(rfc6051_ext_id != 0 && ext_ext_id == rfc6051_ext_id){
375 uint64_t pkt_time_ext = 0;
377 ((uint64_t)ext_ext_data[7]<<0) |
378 ((uint64_t)ext_ext_data[6]<<8) |
379 ((uint64_t)ext_ext_data[5]<<16) |
380 ((uint64_t)ext_ext_data[4]<<24) |
381 ((uint64_t)ext_ext_data[3]<<32) |
382 ((uint64_t)ext_ext_data[2]<<40) |
383 ((uint64_t)ext_ext_data[1]<<48) |
384 ((uint64_t)ext_ext_data[0]<<56);
385 auto tconv = jrtplib::RTPTime(jrtplib::RTPNTPTime((ntp>>32)&0xffffffffu, ntp&0xffffffffu));
386 pkt_time_ext = tconv.GetSeconds() * 1000000LL + tconv.GetMicroSeconds();
387 static uint32_t testvar = 0;
389 LOG_INFO(logger,
"NTP ts %" PRIu64
" ", ntp);
390 LOG_INFO(logger,
"Using RTP 64 bit timestamp extension. (ext ts %" PRIu64
" old ts %" PRIu64
")",
391 pkt_time_ext, pkt_time);
394 pkt_time = pkt_time_ext;
397 LOG_WARNING(logger,
"RTP has unused OBH extension id %u size %zu", ext_ext_id, ext_ext_size);
404 else if(rtp_two_byte_header_extension){
405 LOG_WARNING(logger,
"RTP two byte header extension not supported.");
410 LOG_WARNING(logger,
"RTP has unused extension id %u size %zu.", ext_id, ext_size);
413 if (pkt_time == 0xF000000000000000ULL) {
414 jrtplib::RTPSourceData* source_data = session->GetCurrentSourceInfo();
416 if (source_data->SR_HasInfo()) {
417 uint32_t ref_ts = source_data->SR_GetRTPTimestamp();
418 double ts_unit = source_data->GetTimestampUnit();
419 uint32_t pkt_ts = packet->GetTimestamp();
420 int64_t ts_diff = timestamp_diff(pkt_ts, ref_ts);
421 int64_t ts_diff_us = (int64_t)((
double)ts_diff * (ts_unit * 1000000.0));
424 if(ts_diff_us >= -1000000.0*5.0){
425 jrtplib::RTPTime ref_ntp_rtp = jrtplib::RTPTime(source_data->SR_GetNTPTimestamp());
426 uint64_t ref_time = ref_ntp_rtp.GetSeconds() * 1000000LL + ref_ntp_rtp.GetMicroSeconds();
428 pkt_time = ref_time + ts_diff_us;
431 source_data->FlushPackets();
432 LOG_INFO(logger,
"RTPStream %p: Flushed Packets SSRC 0x%x"
433 "(pkg age %" PRId64
" us pkt_ts %d ref_ts %d ts_unit %f)",
434 this, ssrc, ts_diff_us, pkt_ts, ref_ts, ts_unit);
440 pkt_time = packet->GetTimestamp();
444 if (pkt_time != 0xF000000000000000ULL) {
445 auto iter = dest.find(ssrc);
446 if(iter == dest.end()){
447 LOG_INFO(logger,
"Adding SSRC 0x%x to RTPStream %p", ssrc,
this);
458 d.buffer_size = INITIAL_BUFFER;
459 d.buffer =
new char[d.buffer_size];
460 iter = dest.insert({ssrc, std::move(d)}).first;
462 Dest &d = iter->second;
469 if (d.pps.size() && pkt_time != d.timestamp) {
476 d.prev_receive_time = d.receive_time;
477 d.receive_time = pkt_rtime;
478 d.prev_timestamp = d.timestamp;
479 d.timestamp = pkt_time;
481 d.prev_eseq = d.eseq;
489 d.size_akk += (uint32_t)size;
490 if(d.receive_time-d.rate_time > 1ull<<27){
491 double rt_diff = (d.receive_time-d.rate_time)*(1.0/(1ull<<32));
492 double rate = (d.size_akk*8)/rt_diff;
494 d.rate = (1.0-a)*d.rate + a*rate;
495 d.rate_time = d.receive_time;
500 if (d.codec == Cogs::Core::Codec::None) {
502 else if (d.codec == Cogs::Core::Codec::HEVC) {
503 if((data[0] & 0x80) != 0){
504 LOG_ERROR(logger,
"Corrupt RTP HEVC packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
505 session->DeletePacket(packet);
508 uint32_t type = (data[0]>>1)&0x3f;
510 uint32_t TID = data[1]&0x7;
515 LOG_ERROR(logger,
"Corrupt RTP H265 packet (TID == 0).");
516 session->DeletePacket(packet);
522 processH265NALU(d, type, data, size);
524 else if (type == 48) {
525 LOG_ERROR(logger,
"H265 aggregation packets are not supported.");
528 else if (type == 49) {
529 uint32_t s = (data[2] >> 7) & 0x1;
530 uint32_t e = (data[2] >> 6) & 0x1;
531 uint32_t fu_type = data[2] & 0x3f;
533 assert(fu_type >= 0 && fu_type <= 40);
537 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
538 d.nalu.write8(
static_cast<uint8_t
>(fu_type << 1));
542 size_t siz = size-beg;
543 d.nalu.write(&data[beg], siz);
549 LOG_ERROR(logger,
"Unsupported H265 packet of type %d encountered.", type);
553 else if (d.codec == Cogs::Core::Codec::H264) {
554 if((data[0] & 0x80 ) != 0){
555 LOG_ERROR(logger,
"Corrupt RTP H264 packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
556 session->DeletePacket(packet);
559 uint32_t nri = (data[0] >> 5) & 0x3f;
560 uint32_t rtp_type = data[0] & 0x1f;
563 processH264NALU(d, rtp_type, data, size);
565 else if(rtp_type == 24) {
566 uint8_t* read = data + 1;
568 while (read < (data + size)) {
569 uint16_t naluSize = ntohs(*
reinterpret_cast<uint16_t*
>(read));
572 if ((read[0] & 0x80) != 0) {
573 LOG_ERROR(logger,
"Corrupt RTP H264 packet: Non-zero 'f' field in STAP-A NALU. (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
576 processH264NALU(d, read[0] & 0x1f, read, naluSize);
581 else if (rtp_type == 25) {
582 LOG_ERROR(logger,
"STAP-B packets are not supported.");
585 else if ((rtp_type == 26) || (rtp_type == 27)) {
586 LOG_ERROR(logger,
"MTAP packets are not supported.");
589 else if ((rtp_type == 28) || (rtp_type == 29)) {
590 assert(rtp_type == 28);
591 uint32_t s = (data[1]>>7)&0x1;
592 uint32_t e = (data[1]>>6)&0x1;
593 uint32_t r = (data[1]>>5)&0x1;
595 uint32_t type = (data[1]>>0)&0x1f;
601 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
602 d.nalu.write8(
static_cast<uint8_t
>((nri<<5) | type));
604 size_t beg = (rtp_type == 29) ? 4 : 2;
606 d.nalu.write(&data[beg], size - beg);
613 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
614 d.nalu.write8(
static_cast<uint8_t
>((nri<<5) | type));
616 size_t beg = (rtp_type == 29) ? 4 : 2;
618 d.nalu.write(&data[beg], size - beg);
623 else if (type == 0) {
625 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
626 d.nalu.write8(
static_cast<uint8_t
>((nri << 5) | type));
628 size_t beg = (rtp_type == 29) ? 4 : 2;
630 d.nalu.write(&data[beg], size - beg);
647 session->DeletePacket(packet);
650 while(session->GotoNextSourceWithData());
652 if (use_pool_thread) {
653 session->EndDataAccess();
657void Cogs::RTPStream::processH264NALU(Dest& dest, uint32_t rtpType,
const void* data,
size_t size) {
658 assert (rtpType <= 23);
660 if ((rtpType == 7) && !dest.sps.size()) {
661 dest.sps = std::string(
static_cast<const char*
>(data), size);
662 LOG_INFO(logger,
"RTP received H264 SPS (RTPStream %p)",
this);
664 else if ((rtpType == 8) && !dest.pps.size()){
665 dest.pps = std::string(
static_cast<const char*
>(data), size);
666 LOG_INFO(logger,
"RTP received H264 PPS (RTPStream %p)",
this);
668 dest.nalu.write(naluStartCode,
sizeof(naluStartCode));
669 dest.nalu.write(data, size);
673void Cogs::RTPStream::processH265NALU(Dest& dest, uint32_t rtpType,
const void* data,
size_t size) {
674 assert (rtpType <= 40);
676 if ((rtpType == 32) && !dest.vps.size()) {
677 dest.vps = std::string(
static_cast<const char*
>(data), size);
678 LOG_INFO(logger,
"RTP receive H265 VPS (RTPStream %p)",
this);
680 else if((rtpType == 33) && !dest.sps.size()) {
681 dest.sps = std::string(
static_cast<const char*
>(data), size);
682 LOG_INFO(logger,
"RTP receive H265 SPS (RTPStream %p)",
this);
684 else if((rtpType == 34) && !dest.pps.size()) {
685 dest.pps = std::string(
static_cast<const char*
>(data), size);
686 LOG_INFO(logger,
"RTP receive H265 PPS (RTPStream %p)",
this);
688 dest.nalu.write(naluStartCode,
sizeof(naluStartCode));
689 dest.nalu.write(data, size);
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Log implementation class.
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
bool HandleIsValid(const ResourceHandle_t< T > &handle)
Check if the given resource is valid, that is not equal to NoHandle or InvalidHandle.
constexpr Log getLogger(const char(&name)[LEN]) noexcept
static const ResourceHandle_t NoHandle
Handle representing a default (or none if default not present) resource.