4#include <brotli/decode.h>
6#include "Foundation/Logging/Logger.h"
7#include "Foundation/Platform/MemoryBufferBackedFileContents.h"
9#include "Services/TaskManager.h"
11#include "DataFetcherManager.h"
13#include "DataFetcherBase.h"
21 std::vector<Item> fetchers;
24 uint32_t currentFetcherId = 0;
25 uint32_t currentRequestId = 0;
26 constexpr uint32_t fetcherIdBits = 7;
27 constexpr uint32_t fetcherIdMask = (1u << fetcherIdBits) - 1;
30void Cogs::Core::DataFetcherManager::addDataFetcher(
DataFetcherBase* dataFetcher,
int priority)
32 assert((fetchers.size() < 100) &&
"Manage the fetchers smarter");
33 for (
const auto& fetcher : fetchers) {
34 if (fetcher.fetcher == dataFetcher) {
35 LOG_ERROR(logger,
"addDataFetcher - ignoring duplicate DataFetcher");
40 fetchers.push_back({});
42 size_t i = fetchers.size() - 1;
43 while (0 < i && fetchers[i - 1].priority < priority) {
44 fetchers[i] = fetchers[i - 1];
47 fetchers[i].fetcher = dataFetcher;
48 fetchers[i].priority = priority;
50 if (currentFetcherId == (NoFetchId & fetcherIdMask)) { currentFetcherId = (currentFetcherId + 1) & fetcherIdMask; }
51 fetchers[i].id = currentFetcherId;
52 currentFetcherId = (currentFetcherId + 1) & fetcherIdMask;
55void Cogs::Core::DataFetcherManager::removeDataFetcher(
DataFetcherBase* dataFetcher) {
56 for (
auto i = fetchers.begin(), e = fetchers.end(); i != e; ++i) {
57 if (i->fetcher == dataFetcher) {
63 LOG_ERROR(logger,
"removeDataFetcher - unknown DataFetcher");
69 std::unique_ptr<Cogs::FileContents> brotliDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
71 BrotliDecoderState* decoder = BrotliDecoderCreateInstance(
nullptr,
nullptr,
nullptr);
74 const uint8_t* nextIn = compressed->ptr;
75 size_t availableIn = compressed->size;
77 while (!BrotliDecoderIsFinished(decoder)) {
78 size_t availableOut = 0;
79 BrotliDecoderResult rv = BrotliDecoderDecompressStream(decoder,
86 if (rv == BROTLI_DECODER_RESULT_ERROR) {
87 LOG_ERROR(logger,
"Parsing bin: BrotliDecoderDecompressStream failed: %s", BrotliDecoderErrorString(
static_cast<BrotliDecoderErrorCode
>(rv)));
88 BrotliDecoderDestroyInstance(decoder);
89 return std::unique_ptr<Cogs::FileContents>();
92 size_t outputSize = 0;
93 const uint8_t* outputBuffer = BrotliDecoderTakeOutput(decoder, &outputSize);
94 bool success = decompressed.write(outputBuffer, outputSize);
97 BrotliDecoderDestroyInstance(decoder);
99 LOG_TRACE(logger,
"Brotli-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
101 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
102 compressed->origin().to_string(),
106 std::unique_ptr<Cogs::FileContents> zstdDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
108 ZSTD_DCtx* zstdctx = ZSTD_createDCtx();
110 unsigned long long outSize = ZSTD_getFrameContentSize(compressed->ptr, compressed->size);
112 if (outSize == ZSTD_CONTENTSIZE_UNKNOWN) {
113 LOG_ERROR(logger,
"Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_UNKNOWN");
114 ZSTD_freeDCtx(zstdctx);
115 return std::unique_ptr<Cogs::FileContents>();
117 else if (outSize == ZSTD_CONTENTSIZE_ERROR) {
118 LOG_ERROR(logger,
"Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_ERROR");
119 ZSTD_freeDCtx(zstdctx);
120 return std::unique_ptr<Cogs::FileContents>();
124 decompressed.resize(outSize);
125 size_t status = ZSTD_decompressDCtx(zstdctx,
126 decompressed.data(), decompressed.size(),
127 compressed->ptr, compressed->size);
128 ZSTD_freeDCtx(zstdctx);
130 if (ZSTD_isError(status)) {
131 LOG_ERROR(logger,
"Parsing bin: ZSTD_decompressDCtx failed: %s", ZSTD_getErrorName(status));
132 return std::unique_ptr<Cogs::FileContents>();
135 LOG_TRACE(logger,
"ZStd-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
137 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
138 compressed->origin().to_string(),
142 void maybeDecompressAndInvokeHandler(
const Cogs::FileContents::Callback& callback, std::unique_ptr<Cogs::FileContents> && data)
146 callback(std::move(data));
152 callback(brotliDecompress(std::move(data)));
162 static_assert(std::endian::native == std::endian::little);
163 if (4 <= data->size && *
reinterpret_cast<const uint32_t*
>(data->ptr) == 0xFD2FB528) {
164 callback(zstdDecompress(std::move(data)));
169 callback(std::move(data));
174Cogs::Core::DataFetcherManager::FetchId
175Cogs::Core::DataFetcherManager::fetchAsync(
Context* context,
const std::string& fileName,
const FileContents::Callback& callback, uint64_t offset, uint64_t size,
bool cancellable,
FileContentsHints hints)
181 FileContents::Callback moveToResourceQueueHandler =
182 [context, callback](std::unique_ptr<Cogs::FileContents>&& data)
187 [callback, dataPtr = data.release()]()
189 maybeDecompressAndInvokeHandler(callback, std::unique_ptr<Cogs::FileContents>(dataPtr));
193 maybeDecompressAndInvokeHandler(callback, std::move(data));
199 for (
Item& item : fetchers) {
201 FetchId cancellationId = NoFetchId;
203 cancellationId = (currentRequestId << fetcherIdBits) | item.id;
204 assert(cancellationId != NoFetchId);
207 if (item.fetcher->fetchAsync(fileName, moveToResourceQueueHandler, offset, size, cancellationId, hints)) {
211 return cancellationId;
216 IO::readFileAsync(fileName, moveToResourceQueueHandler, offset, size, hints);
221 [fileName, callback, hints, offset, size]()
223 maybeDecompressAndInvokeHandler(callback, IO::readFileSync(fileName, offset, size, hints));
230Cogs::FileContents::Ptr Cogs::Core::DataFetcherManager::fetchSync(
const std::string& fileName, uint64_t offset, uint64_t size,
FileContentsHints hints)
232 for (
Item& item : fetchers) {
233 FileContents::Ptr contents = item.fetcher->fetchSync(fileName, offset, size, hints);
239 return IO::readFileSync(fileName, offset, size, hints);
242void Cogs::Core::DataFetcherManager::cancelAsyncFetch(
Context* , FetchId
id)
244 if (
id == NoFetchId)
return;
246 for (
Item& item : fetchers) {
247 if (item.id == (
id & fetcherIdMask)) {
248 item.fetcher->cancelAsyncFetch(
id);
A Context instance contains all the services, systems and runtime components needed to use Cogs.
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Manages Task queuing and execution.
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
bool onMainThread() const
Returns true if called on the main thread.
Log implementation class.
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
constexpr Log getLogger(const char(&name)[LEN]) noexcept
@ BrotliDecompress
A hint that the contents are Brotli (Google) compressed and is allowed to be decompressed during tran...
@ ZStdDecompress
A hint that the contents are Zstandard (Facebook) compressed and is allowed to be decompressed during...