4#include "Resources/TextureManager.h"
5#include "Renderer/RenderTexture.h"
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/Timer.h"
11#include "jrtp/rtpsession.h"
12#include "jrtp/rtpsessionparams.h"
13#include "jrtp/rtpudpv4transmitter.h"
14#include "jrtp/rtptcptransmitter.h"
15#include "jrtp/rtppacket.h"
16#include "jrtp/rtpsourcedata.h"
17#include "jrtp/rtptcpaddress.h"
22#define INITIAL_BUFFER 64*3*5*7
25using namespace Cogs::Network;
28 constexpr const uint8_t naluStartCode[] = { 0x00, 0x00, 0x00, 0x01 };
30 class CogsRTPMemoryManager :
public jrtplib::RTPMemoryManager {
32 static CogsRTPMemoryManager* instance() {
33 static CogsRTPMemoryManager i;
38 void* AllocateBuffer(
size_t numbytes,
int ) {
39 void* mem = malloc(numbytes);
42 OsoMP_Allocate(COGSMEM_LibRTP, mem, numbytes,
nullptr, 0);
47 void FreeBuffer(
void* ptr) {
49 OsoMP_Free(COGSMEM_LibRTP, ptr,
nullptr, 0);
55int64_t timestamp_diff(uint32_t t1, uint32_t t0)
88 int32_t diff = (int64_t)t1 - (int64_t)t0;
91void timestamp_diff_test()
94 assert(timestamp_diff(0, 0) == 0);
95 assert(timestamp_diff(0xffffffffu, 0xffffffffu) == 0);
96 assert(timestamp_diff(1, 1) == 0);
97 assert(timestamp_diff(2, 1) == 1);
98 assert(timestamp_diff(1, 2) == -1);
101 assert(timestamp_diff(0, 0xffffffffu) == 1);
102 assert(timestamp_diff(1, 0xffffffffu) == 2);
105 assert(timestamp_diff(0xffffffffu, 0) == -1);
106 assert(timestamp_diff(0xffffffffu, 1) == -2);
115 clock_rate(clock_rate),
118 timestamp_diff_test();
119 LOG_INFO(logger,
"jrtplib Version: %s", jrtplib::RTPLibraryVersion::GetVersion().GetVersionString().c_str());
121 tcpConnection.setDoNotProcess();
123 session =
new jrtplib::RTPSession(
nullptr, CogsRTPMemoryManager::instance());
125 jrtplib::RTPSessionParams sessionparams;
126 sessionparams.SetOwnTimestampUnit(1.0/clock_rate);
128 assert(!sessionparams.IsUsingPollThread());
131 jrtplib::RTPUDPv4TransmissionParams transparams;
133 transparams.SetPortbase(port);
134 transparams.SetRTPReceiveBuffer(1024*1024);
135 transparams.SetRTCPReceiveBuffer(1500*64);
136 if (!(mc_adapter[0] == 0 && mc_adapter[1] == 0 && mc_adapter[2] == 0 && mc_adapter[3] == 0)){
137 LOG_INFO(logger,
"Multicast adapter: %s", mc_adapter.string().c_str());
138 uint8_t ip_arr[4] = {mc_adapter[0], mc_adapter[1], mc_adapter[2], mc_adapter[3]};
139 jrtplib::RTPIPv4Address rtp_ip;
140 rtp_ip.SetIP(ip_arr);
141 transparams.SetMulticastInterfaceIP(rtp_ip.GetIP());
144 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
146 LOG_ERROR(logger,
"session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
149 jrtplib::RTPUDPv4TransmissionInfo *info =
reinterpret_cast<jrtplib::RTPUDPv4TransmissionInfo*
>(session->GetTransmissionInfo());
150 rtp_port = info->GetRTPPort();
151 rtcp_port = info->GetRTCPPort();
154 jrtplib::RTPTCPTransmissionParams transparams;
155 int status = session->Create(sessionparams, &transparams, transparams.GetTransmissionProtocol());
158 LOG_ERROR(logger,
"session->Create Failed: %s", jrtplib::RTPGetErrorString(status).c_str());
162 rtcp_port = port + 1;
170Cogs::RTPStream::~RTPStream()
172 LOG_INFO(logger,
"Shutdown");
173 jrtplib::RTPTime delay(10.0);
174 session->BYEDestroy(delay,
"BYE", 9);
175 for(
auto &tmp : dest) {
177 fclose(tmp.second.fp);
179 delete [] tmp.second.buffer;
183void Cogs::RTPStream::AddDestination(
const uint8_t in_ip[4], uint16_t in_server_port)
185 for (uint32_t i = 0; i < 4; i++) {
188 server_port = in_server_port;
190 LOG_INFO(logger,
"Add destination %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
195 status = session->AddDestination(jrtplib::RTPIPv4Address(ip, server_port));
200 if (tcpConnection.connect(addr)) {
201 LOG_INFO(logger,
"RTPStream::AddDestination: Connected to %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
203 tcpConnection.enableAutoReconnect();
204 status = session->AddDestination(jrtplib::RTPTCPAddress(tcpConnection.getSocket()));
207 LOG_ERROR(logger,
"RTPStream::AddDestination failed to connect to : %d.%d.%d.%d:%d", ip[0], ip[1], ip[2], ip[3], server_port);
211 LOG_ERROR(logger,
"RTPStream::AddDestination failed: %s", jrtplib::RTPGetErrorString(status).c_str());
214void Cogs::RTPStream::JoinMulticastGroup(
const uint8_t in_ip[4], uint16_t in_server_port)
216 server_port = in_server_port;
217 for(uint32_t i=0; i<4; i++) ip[i] = in_ip[i];
218 LOG_INFO(logger,
"Join multicast group %d.%d.%d.%d:%d-%d", ip[0], ip[1], ip[2], ip[3], server_port, server_port+1);
219 jrtplib::RTPIPv4Address addr(ip, server_port);
220 if(!session->SupportsMulticasting()){
221 LOG_ERROR(logger,
"session->SupportsMulticasting failed. (Possibly duplicate connection)");
224 int status = session->JoinMulticastGroup(addr);
226 LOG_ERROR(logger,
"session->JoinMulticastGroup failed: %s",
227 jrtplib::RTPGetErrorString(status).c_str());
230void Cogs::RTPStream::SendPacket(
const uint8_t *buffer,
size_t size)
232 int status = session->SendPacket(buffer, size);
234 LOG_ERROR(logger,
"session->SendPacket failed: %s",
235 jrtplib::RTPGetErrorString(status).c_str());
239static bool g_first =
true;
240void Cogs::RTPStream::AddSSRC(uint32_t ssrc, Cogs::Core::Codec codec_in, ResourceId textureId)
252 d.buffer_size = INITIAL_BUFFER;
253 d.buffer =
new char[d.buffer_size];
255 LOG_INFO(logger,
"Adding SSRC 0x%x to RTPStream %p", ssrc,
this);
257 dest[ssrc] = std::move(d);
259void Cogs::RTPStream::WriteData(Dest &dest_w)
263 fwrite(dest_w.nalu.data(), 1, dest_w.nalu.size(), dest_w.fp);
270 if (decoderContext) {
273 payload.size = dest_w.nalu.size();
274 payload.data = dest_w.nalu.data();
275 payload.timestamp = dest_w.timestamp;
276 payload.use_timestamp =
true;
278 decoderContext->streamVideoData(dest_w.textureHandle, payload);
287 for(
auto &tmp : dest){
288 if(tmp.second.receive_time >= t){
289 t = tmp.second.receive_time;
295void Cogs::RTPStream::Update()
297 if(use_pool_thread) {
298 session->BeginDataAccess();
305 if(session->GotoFirstSource()){
307 jrtplib::RTPSourceData *source = session->GetCurrentSourceInfo();
308 source->SetTimestampUnit(1.0/clock_rate);
309 if(source->ReceivedBYE()){
310 uint32_t ssrc = source->GetSSRC();
311 auto iter = dest.find(ssrc);
312 if(iter != dest.end()){
314 uint8_t *bye_reason = source->GetBYEReason(&bye_len);
315 jrtplib::RTPTime rtp_time = source->GetBYETime();
316 uint64_t bye_time = rtp_time.GetSeconds() * 1000000LL + rtp_time.GetMicroSeconds();
317 LOG_INFO(logger,
"Removing SSRC 0x%x from RTPStream %p. BYE: %s (t=%" PRIu64
")",
318 ssrc,
this, bye_reason ? std::string((
char*)bye_reason, bye_len).c_str() :
"", bye_time);
320 fclose(iter->second.fp);
321 delete [] iter->second.buffer;
325 }
while(session->GotoNextSource());
328 LOG_ERROR(logger,
"GotoFirstSource() failed for RTPStream %p.",
this);
332 if(session->GotoFirstSourceWithData()){
333 std::unique_lock lock(dest_mu);
335 jrtplib::RTPPacket *packet;
337 while ((packet = session->GetNextPacket()) != 0) {
338 uint32_t ssrc = packet->GetSSRC();
339 uint32_t eseq = packet->GetExtendedSequenceNumber();
340 uint8_t* data = packet->GetPayloadData();
341 size_t size = packet->GetPayloadLength();
342 jrtplib::RTPTime rtime = packet->GetReceiveTime();
343 uint64_t pkt_rtime = rtime.GetSeconds() * 1000000LL + rtime.GetMicroSeconds();
344 uint64_t pkt_time = 0xF000000000000000ULL;
346 if (packet->HasExtension()) {
347 uint16_t ext_id = packet->GetExtensionID();
348 size_t ext_size = packet->GetExtensionLength();
349 uint8_t *ext_data = packet->GetExtensionData();
353 if(rtp_one_byte_header_extension){
356 while(idx < ext_size) {
357 uint8_t ext_ext_header = ext_data[idx++];
358 if(ext_ext_header == 0)
continue;
359 if(idx == ext_size)
break;
360 uint16_t ext_ext_id = (ext_ext_header&0xf0)>>4;
361 size_t ext_ext_size = 1+ext_ext_header&0xf;
362 if(ext_ext_id == 15)
break;
363 uint8_t *ext_ext_data = &ext_data[idx];
365 if(rfc6051_ext_id != 0 && ext_ext_id == rfc6051_ext_id){
366 uint64_t pkt_time_ext = 0;
368 ((uint64_t)ext_ext_data[7]<<0) |
369 ((uint64_t)ext_ext_data[6]<<8) |
370 ((uint64_t)ext_ext_data[5]<<16) |
371 ((uint64_t)ext_ext_data[4]<<24) |
372 ((uint64_t)ext_ext_data[3]<<32) |
373 ((uint64_t)ext_ext_data[2]<<40) |
374 ((uint64_t)ext_ext_data[1]<<48) |
375 ((uint64_t)ext_ext_data[0]<<56);
376 auto tconv = jrtplib::RTPTime(jrtplib::RTPNTPTime((ntp>>32)&0xffffffffu, ntp&0xffffffffu));
377 pkt_time_ext = tconv.GetSeconds() * 1000000LL + tconv.GetMicroSeconds();
378 static uint32_t testvar = 0;
380 LOG_INFO(logger,
"NTP ts %" PRIu64
" ", ntp);
381 LOG_INFO(logger,
"Using RTP 64 bit timestamp extension. (ext ts %" PRIu64
" old ts %" PRIu64
")",
382 pkt_time_ext, pkt_time);
385 pkt_time = pkt_time_ext;
388 LOG_WARNING(logger,
"RTP has unused OBH extension id %u size %zu", ext_ext_id, ext_ext_size);
395 else if(rtp_two_byte_header_extension){
396 LOG_WARNING(logger,
"RTP two byte header extension not supported.");
401 LOG_WARNING(logger,
"RTP has unused extension id %u size %zu.", ext_id, ext_size);
404 if (pkt_time == 0xF000000000000000ULL) {
405 jrtplib::RTPSourceData* source_data = session->GetCurrentSourceInfo();
407 if (source_data->SR_HasInfo()) {
408 uint32_t ref_ts = source_data->SR_GetRTPTimestamp();
409 double ts_unit = source_data->GetTimestampUnit();
410 uint32_t pkt_ts = packet->GetTimestamp();
411 int64_t ts_diff = timestamp_diff(pkt_ts, ref_ts);
412 int64_t ts_diff_us = (int64_t)((
double)ts_diff * (ts_unit * 1000000.0));
415 if(ts_diff_us >= -1000000.0*5.0){
416 jrtplib::RTPTime ref_ntp_rtp = jrtplib::RTPTime(source_data->SR_GetNTPTimestamp());
417 uint64_t ref_time = ref_ntp_rtp.GetSeconds() * 1000000LL + ref_ntp_rtp.GetMicroSeconds();
419 pkt_time = ref_time + ts_diff_us;
422 source_data->FlushPackets();
423 LOG_INFO(logger,
"RTPStream %p: Flushed Packets SSRC 0x%x"
424 "(pkg age %" PRId64
" us pkt_ts %d ref_ts %d ts_unit %f)",
425 this, ssrc, ts_diff_us, pkt_ts, ref_ts, ts_unit);
431 pkt_time = packet->GetTimestamp();
435 if (pkt_time != 0xF000000000000000ULL) {
436 auto iter = dest.find(ssrc);
437 if(iter == dest.end()){
438 LOG_INFO(logger,
"Adding SSRC 0x%x to RTPStream %p", ssrc,
this);
449 d.buffer_size = INITIAL_BUFFER;
450 d.buffer =
new char[d.buffer_size];
451 iter = dest.insert({ssrc, std::move(d)}).first;
453 Dest &d = iter->second;
460 if (d.pps.size() && pkt_time != d.timestamp) {
467 d.prev_receive_time = d.receive_time;
468 d.receive_time = pkt_rtime;
469 d.prev_timestamp = d.timestamp;
470 d.timestamp = pkt_time;
472 d.prev_eseq = d.eseq;
480 d.size_akk += (uint32_t)size;
481 if(d.receive_time-d.rate_time > 1ull<<27){
482 double rt_diff = (d.receive_time-d.rate_time)*(1.0/(1ull<<32));
483 double rate = (d.size_akk*8)/rt_diff;
485 d.rate = (1.0-a)*d.rate + a*rate;
486 d.rate_time = d.receive_time;
491 if (d.codec == Cogs::Core::Codec::None) {
493 else if (d.codec == Cogs::Core::Codec::HEVC) {
494 if((data[0] & 0x80) != 0){
495 LOG_ERROR(logger,
"Corrupt RTP HEVC packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
496 session->DeletePacket(packet);
499 uint32_t type = (data[0]>>1)&0x3f;
501 uint32_t TID = data[1]&0x7;
506 LOG_ERROR(logger,
"Corrupt RTP H265 packet (TID == 0).");
507 session->DeletePacket(packet);
513 processH265NALU(d, type, data, size);
515 else if (type == 48) {
516 LOG_ERROR(logger,
"H265 aggregation packets are not supported.");
519 else if (type == 49) {
520 uint32_t s = (data[2] >> 7) & 0x1;
521 uint32_t e = (data[2] >> 6) & 0x1;
522 uint32_t fu_type = data[2] & 0x3f;
524 assert(fu_type >= 0 && fu_type <= 40);
528 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
529 d.nalu.write8(
static_cast<uint8_t
>(fu_type << 1));
533 size_t siz = size-beg;
534 d.nalu.write(&data[beg], siz);
540 LOG_ERROR(logger,
"Unsupported H265 packet of type %d encountered.", type);
544 else if (d.codec == Cogs::Core::Codec::H264) {
545 if((data[0] & 0x80 ) != 0){
546 LOG_ERROR(logger,
"Corrupt RTP H264 packet (f != 0). (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
547 session->DeletePacket(packet);
550 uint32_t nri = (data[0] >> 5) & 0x3f;
551 uint32_t rtp_type = data[0] & 0x1f;
554 processH264NALU(d, rtp_type, data, size);
556 else if(rtp_type == 24) {
557 uint8_t* read = data + 1;
559 while (read < (data + size)) {
560 uint16_t naluSize = ntohs(*
reinterpret_cast<uint16_t*
>(read));
563 if ((read[0] & 0x80) != 0) {
564 LOG_ERROR(logger,
"Corrupt RTP H264 packet: Non-zero 'f' field in STAP-A NALU. (RTPStream %p). Possible MPEG-TS stream not supported.",
this);
567 processH264NALU(d, read[0] & 0x1f, read, naluSize);
572 else if (rtp_type == 25) {
573 LOG_ERROR(logger,
"STAP-B packets are not supported.");
576 else if ((rtp_type == 26) || (rtp_type == 27)) {
577 LOG_ERROR(logger,
"MTAP packets are not supported.");
580 else if ((rtp_type == 28) || (rtp_type == 29)) {
581 assert(rtp_type == 28);
582 uint32_t s = (data[1]>>7)&0x1;
583 uint32_t e = (data[1]>>6)&0x1;
584 uint32_t r = (data[1]>>5)&0x1;
586 uint32_t type = (data[1]>>0)&0x1f;
592 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
593 d.nalu.write8(
static_cast<uint8_t
>((nri<<5) | type));
595 size_t beg = (rtp_type == 29) ? 4 : 2;
597 d.nalu.write(&data[beg], size - beg);
604 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
605 d.nalu.write8(
static_cast<uint8_t
>((nri<<5) | type));
607 size_t beg = (rtp_type == 29) ? 4 : 2;
609 d.nalu.write(&data[beg], size - beg);
614 else if (type == 0) {
616 d.nalu.write(naluStartCode,
sizeof(naluStartCode));
617 d.nalu.write8(
static_cast<uint8_t
>((nri << 5) | type));
619 size_t beg = (rtp_type == 29) ? 4 : 2;
621 d.nalu.write(&data[beg], size - beg);
638 session->DeletePacket(packet);
641 while(session->GotoNextSourceWithData());
643 if (use_pool_thread) {
644 session->EndDataAccess();
648void Cogs::RTPStream::processH264NALU(Dest& dest, uint32_t rtpType,
const void* data,
size_t size) {
649 assert (rtpType <= 23);
651 if ((rtpType == 7) && !dest.sps.size()) {
652 dest.sps = std::string(
static_cast<const char*
>(data), size);
653 LOG_INFO(logger,
"RTP received H264 SPS (RTPStream %p)",
this);
655 else if ((rtpType == 8) && !dest.pps.size()){
656 dest.pps = std::string(
static_cast<const char*
>(data), size);
657 LOG_INFO(logger,
"RTP received H264 PPS (RTPStream %p)",
this);
659 dest.nalu.write(naluStartCode,
sizeof(naluStartCode));
660 dest.nalu.write(data, size);
664void Cogs::RTPStream::processH265NALU(Dest& dest, uint32_t rtpType,
const void* data,
size_t size) {
665 assert (rtpType <= 40);
667 if ((rtpType == 32) && !dest.vps.size()) {
668 dest.vps = std::string(
static_cast<const char*
>(data), size);
669 LOG_INFO(logger,
"RTP receive H265 VPS (RTPStream %p)",
this);
671 else if((rtpType == 33) && !dest.sps.size()) {
672 dest.sps = std::string(
static_cast<const char*
>(data), size);
673 LOG_INFO(logger,
"RTP receive H265 SPS (RTPStream %p)",
this);
675 else if((rtpType == 34) && !dest.pps.size()) {
676 dest.pps = std::string(
static_cast<const char*
>(data), size);
677 LOG_INFO(logger,
"RTP receive H265 PPS (RTPStream %p)",
this);
679 dest.nalu.write(naluStartCode,
sizeof(naluStartCode));
680 dest.nalu.write(data, size);
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Log implementation class.
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
bool HandleIsValid(const ResourceHandle_t< T > &handle)
Check if the given resource is valid, that is not equal to NoHandle or InvalidHandle.
constexpr Log getLogger(const char(&name)[LEN]) noexcept
static const ResourceHandle_t NoHandle
Handle representing a default (or none if default not present) resource.