5#include <brotli/decode.h>
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/MemoryBufferBackedFileContents.h"
10#include "Services/TaskManager.h"
12#include "DataFetcherManager.h"
14#include "DataFetcherBase.h"
23 std::shared_mutex fetcherLock;
24 std::vector<Item> fetchers;
27 uint32_t currentFetcherId = 0;
28 uint32_t currentRequestId = 0;
29 constexpr uint32_t fetcherIdBits = 7;
30 constexpr uint32_t fetcherIdMask = (1u << fetcherIdBits) - 1;
33void Cogs::Core::DataFetcherManager::addDataFetcher(
DataFetcherBase* dataFetcher,
int priority)
35 std::unique_lock guard(fetcherLock);
36 assert((fetchers.size() < 100) &&
"Manage the fetchers smarter");
37 for (
const auto& fetcher : fetchers) {
38 if (fetcher.fetcher == dataFetcher) {
39 LOG_ERROR(logger,
"addDataFetcher - ignoring duplicate DataFetcher");
44 fetchers.push_back({});
46 size_t i = fetchers.size() - 1;
47 while (0 < i && fetchers[i - 1].priority < priority) {
48 fetchers[i] = fetchers[i - 1];
51 fetchers[i].fetcher = dataFetcher;
52 fetchers[i].priority = priority;
54 if (currentFetcherId == (NoFetchId & fetcherIdMask)) { currentFetcherId = (currentFetcherId + 1) & fetcherIdMask; }
55 fetchers[i].id = currentFetcherId;
56 currentFetcherId = (currentFetcherId + 1) & fetcherIdMask;
59void Cogs::Core::DataFetcherManager::removeDataFetcher(
DataFetcherBase* dataFetcher) {
60 std::unique_lock guard(fetcherLock);
61 for (
auto i = fetchers.begin(), e = fetchers.end(); i != e; ++i) {
62 if (i->fetcher == dataFetcher) {
68 LOG_ERROR(logger,
"removeDataFetcher - unknown DataFetcher");
74 std::unique_ptr<Cogs::FileContents> brotliDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
76 BrotliDecoderState* decoder = BrotliDecoderCreateInstance(
nullptr,
nullptr,
nullptr);
79 const uint8_t* nextIn = compressed->ptr;
80 size_t availableIn = compressed->size;
82 while (!BrotliDecoderIsFinished(decoder)) {
83 size_t availableOut = 0;
84 BrotliDecoderResult rv = BrotliDecoderDecompressStream(decoder,
91 if (rv == BROTLI_DECODER_RESULT_ERROR) {
92 LOG_ERROR(logger,
"Parsing bin: BrotliDecoderDecompressStream failed: %s", BrotliDecoderErrorString(
static_cast<BrotliDecoderErrorCode
>(rv)));
93 BrotliDecoderDestroyInstance(decoder);
94 return std::unique_ptr<Cogs::FileContents>();
97 size_t outputSize = 0;
98 const uint8_t* outputBuffer = BrotliDecoderTakeOutput(decoder, &outputSize);
99 bool success = decompressed.write(outputBuffer, outputSize);
102 BrotliDecoderDestroyInstance(decoder);
104 LOG_TRACE(logger,
"Brotli-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
106 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
107 compressed->origin().to_string(),
111 std::unique_ptr<Cogs::FileContents> zstdDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
113 ZSTD_DCtx* zstdctx = ZSTD_createDCtx();
115 unsigned long long outSize = ZSTD_getFrameContentSize(compressed->ptr, compressed->size);
117 if (outSize == ZSTD_CONTENTSIZE_UNKNOWN) {
118 LOG_ERROR(logger,
"Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_UNKNOWN");
119 ZSTD_freeDCtx(zstdctx);
120 return std::unique_ptr<Cogs::FileContents>();
122 else if (outSize == ZSTD_CONTENTSIZE_ERROR) {
123 LOG_ERROR(logger,
"Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_ERROR");
124 ZSTD_freeDCtx(zstdctx);
125 return std::unique_ptr<Cogs::FileContents>();
129 decompressed.resize(outSize);
130 size_t status = ZSTD_decompressDCtx(zstdctx,
131 decompressed.data(), decompressed.size(),
132 compressed->ptr, compressed->size);
133 ZSTD_freeDCtx(zstdctx);
135 if (ZSTD_isError(status)) {
136 LOG_ERROR(logger,
"Parsing bin: ZSTD_decompressDCtx failed: %s", ZSTD_getErrorName(status));
137 return std::unique_ptr<Cogs::FileContents>();
140 LOG_TRACE(logger,
"ZStd-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
142 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
143 compressed->origin().to_string(),
147 void maybeDecompressAndInvokeHandler(
const Cogs::FileContents::Callback& callback, std::unique_ptr<Cogs::FileContents> && data)
151 callback(std::move(data));
157 callback(brotliDecompress(std::move(data)));
167 static_assert(std::endian::native == std::endian::little);
168 if (4 <= data->size && *
reinterpret_cast<const uint32_t*
>(data->ptr) == 0xFD2FB528) {
169 callback(zstdDecompress(std::move(data)));
174 callback(std::move(data));
179Cogs::Core::DataFetcherManager::FetchId
180Cogs::Core::DataFetcherManager::fetchAsync(
Context* context,
const std::string& fileName,
const FileContents::Callback& callback, uint64_t offset, uint64_t size,
bool cancellable,
FileContentsHints hints)
186 FileContents::Callback moveToResourceQueueHandler =
187 [context, callback](std::unique_ptr<Cogs::FileContents>&& data)
192 [callback, dataPtr = data.release()]()
194 maybeDecompressAndInvokeHandler(callback, std::unique_ptr<Cogs::FileContents>(dataPtr));
198 maybeDecompressAndInvokeHandler(callback, std::move(data));
205 std::shared_lock guard(fetcherLock);
206 for (
Item& item : fetchers) {
208 FetchId cancellationId = NoFetchId;
210 cancellationId = (currentRequestId << fetcherIdBits) | item.id;
211 assert(cancellationId != NoFetchId);
214 if (item.fetcher->fetchAsync(fileName, moveToResourceQueueHandler, offset, size, cancellationId, hints)) {
218 return cancellationId;
224 IO::readFileAsync(fileName, moveToResourceQueueHandler, offset, size, hints);
229 [fileName, callback, hints, offset, size]()
231 maybeDecompressAndInvokeHandler(callback, IO::readFileSync(fileName, offset, size, hints));
238Cogs::FileContents::Ptr Cogs::Core::DataFetcherManager::fetchSync(
const std::string& fileName, uint64_t offset, uint64_t size,
FileContentsHints hints)
240 for (
Item& item : fetchers) {
241 FileContents::Ptr contents = item.fetcher->fetchSync(fileName, offset, size, hints);
247 return IO::readFileSync(fileName, offset, size, hints);
250void Cogs::Core::DataFetcherManager::cancelAsyncFetch(
Context* , FetchId
id)
252 if (
id == NoFetchId)
return;
254 for (
Item& item : fetchers) {
255 if (item.id == (
id & fetcherIdMask)) {
256 item.fetcher->cancelAsyncFetch(
id);
A Context instance contains all the services, systems and runtime components needed to use Cogs.
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Manages Task queuing and execution.
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
bool onMainThread() const
Returns true if called on the main thread.
Log implementation class.
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
constexpr Log getLogger(const char(&name)[LEN]) noexcept
@ BrotliDecompress
A hint that the contents are Brotli (Google) compressed and is allowed to be decompressed during tran...
@ ZStdDecompress
A hint that the contents are Zstandard (Facebook) compressed and is allowed to be decompressed during...