Cogs.Core
DataFetcherManager.cpp
1#include <cassert>
2#include "zstd.h"
3#include <bit>
4#include <brotli/decode.h>
5
6#include "Foundation/Logging/Logger.h"
7#include "Foundation/Platform/MemoryBufferBackedFileContents.h"
8#include "Context.h"
9#include "Services/TaskManager.h"
10
11#include "DataFetcherManager.h"
12
13#include "DataFetcherBase.h"
14
15namespace {
16 struct Item {
17 Cogs::Core::DataFetcherBase* fetcher = nullptr;
18 int priority = 0;
19 uint32_t id = 0;
20 };
21 std::vector<Item> fetchers;
22 Cogs::Logging::Log logger = Cogs::Logging::getLogger("DataFetcherManager");
23
24 uint32_t currentFetcherId = 0;
25 uint32_t currentRequestId = 0;
26 constexpr uint32_t fetcherIdBits = 7;
27 constexpr uint32_t fetcherIdMask = (1u << fetcherIdBits) - 1;
28}
29
30void Cogs::Core::DataFetcherManager::addDataFetcher(DataFetcherBase* dataFetcher, int priority)
31{
32 assert((fetchers.size() < 100) && "Manage the fetchers smarter");
33 for (const auto& fetcher : fetchers) {
34 if (fetcher.fetcher == dataFetcher) {
35 LOG_ERROR(logger, "addDataFetcher - ignoring duplicate DataFetcher");
36 return;
37 }
38 }
39
40 fetchers.push_back({});
41 // Insertion sort, no point in full blown introsort for just a few elements.
42 size_t i = fetchers.size() - 1;
43 while (0 < i && fetchers[i - 1].priority < priority) {
44 fetchers[i] = fetchers[i - 1];
45 i--;
46 }
47 fetchers[i].fetcher = dataFetcher;
48 fetchers[i].priority = priority;
49
50 if (currentFetcherId == (NoFetchId & fetcherIdMask)) { currentFetcherId = (currentFetcherId + 1) & fetcherIdMask; }
51 fetchers[i].id = currentFetcherId;
52 currentFetcherId = (currentFetcherId + 1) & fetcherIdMask;
53}
54
55void Cogs::Core::DataFetcherManager::removeDataFetcher(DataFetcherBase* dataFetcher) {
56 for (auto i = fetchers.begin(), e = fetchers.end(); i != e; ++i) {
57 if (i->fetcher == dataFetcher) {
58 fetchers.erase(i);
59 return;
60 }
61 }
62
63 LOG_ERROR(logger, "removeDataFetcher - unknown DataFetcher");
64}
65
66namespace {
67 using namespace Cogs::Core;
68
69 std::unique_ptr<Cogs::FileContents> brotliDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
70 {
71 BrotliDecoderState* decoder = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr); // No custom memory allocator
72 assert(decoder);
73
74 const uint8_t* nextIn = compressed->ptr;
75 size_t availableIn = compressed->size;
76 Cogs::Memory::MemoryBuffer decompressed;
77 while (!BrotliDecoderIsFinished(decoder)) {
78 size_t availableOut = 0;
79 BrotliDecoderResult rv = BrotliDecoderDecompressStream(decoder,
80 &availableIn,
81 &nextIn,
82 &availableOut,
83 nullptr, // next output buffer
84 nullptr); // output so far
85
86 if (rv == BROTLI_DECODER_RESULT_ERROR) {
87 LOG_ERROR(logger, "Parsing bin: BrotliDecoderDecompressStream failed: %s", BrotliDecoderErrorString(static_cast<BrotliDecoderErrorCode>(rv)));
88 BrotliDecoderDestroyInstance(decoder);
89 return std::unique_ptr<Cogs::FileContents>();
90 }
91
92 size_t outputSize = 0;
93 const uint8_t* outputBuffer = BrotliDecoderTakeOutput(decoder, &outputSize);
94 bool success = decompressed.write(outputBuffer, outputSize);
95 assert(success);
96 }
97 BrotliDecoderDestroyInstance(decoder);
98
99 LOG_TRACE(logger, "Brotli-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
100
101 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
102 compressed->origin().to_string(),
103 compressed->hints & ~Cogs::FileContentsHints::BrotliDecompress);
104 }
105
106 std::unique_ptr<Cogs::FileContents> zstdDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
107 {
108 ZSTD_DCtx* zstdctx = ZSTD_createDCtx();
109
110 unsigned long long outSize = ZSTD_getFrameContentSize(compressed->ptr, compressed->size);
111
112 if (outSize == ZSTD_CONTENTSIZE_UNKNOWN) {
113 LOG_ERROR(logger, "Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_UNKNOWN");
114 ZSTD_freeDCtx(zstdctx);
115 return std::unique_ptr<Cogs::FileContents>();
116 }
117 else if (outSize == ZSTD_CONTENTSIZE_ERROR) {
118 LOG_ERROR(logger, "Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_ERROR");
119 ZSTD_freeDCtx(zstdctx);
120 return std::unique_ptr<Cogs::FileContents>();
121 }
122
123 Cogs::Memory::MemoryBuffer decompressed;
124 decompressed.resize(outSize);
125 size_t status = ZSTD_decompressDCtx(zstdctx,
126 decompressed.data(), decompressed.size(),
127 compressed->ptr, compressed->size);
128 ZSTD_freeDCtx(zstdctx);
129
130 if (ZSTD_isError(status)) {
131 LOG_ERROR(logger, "Parsing bin: ZSTD_decompressDCtx failed: %s", ZSTD_getErrorName(status));
132 return std::unique_ptr<Cogs::FileContents>();
133 }
134
135 LOG_TRACE(logger, "ZStd-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
136
137 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
138 compressed->origin().to_string(),
139 compressed->hints & ~Cogs::FileContentsHints::ZStdDecompress);
140 }
141
142 void maybeDecompressAndInvokeHandler(const Cogs::FileContents::Callback& callback, std::unique_ptr<Cogs::FileContents> && data)
143 {
144 if (!data) {
145 // No data was loaded. Just pass this along.
146 callback(std::move(data));
147 return;
148 }
149
150 // Check if the data should be decompressed in transit
152 callback(brotliDecompress(std::move(data)));
153 return;
154 }
155
157
158 // Since ZStd compressed files can be identified via a magic number at the beginning, we check
159 // for that before decompressing it. Hence we can add the FileContentsHints::ZStdDecompress
160 // speculatively and transparently just compress if it is actually compressed, something that
161 // is used for compressed asset hierarchies (see notes in Documentation/AssetSystem.md)
162 static_assert(std::endian::native == std::endian::little); // TODO: Handle case below in the unlikely event that cogs is compiled on a non little-endian architecture.
163 if (4 <= data->size && *reinterpret_cast<const uint32_t*>(data->ptr) == 0xFD2FB528) {
164 callback(zstdDecompress(std::move(data)));
165 return;
166 }
167 }
168
169 callback(std::move(data));
170 }
171
172}
173
174Cogs::Core::DataFetcherManager::FetchId
175Cogs::Core::DataFetcherManager::fetchAsync(Context* context, const std::string& fileName, const FileContents::Callback& callback, uint64_t offset, uint64_t size, bool cancellable, FileContentsHints hints)
176{
177 assert(context);
178
179 // Wrap the callback in our own callback that marshalls to the resource queue if we are on
180 // the main thread and tries to decompress the data if it is known to be compressed.
181 FileContents::Callback moveToResourceQueueHandler =
182 [context, callback](std::unique_ptr<Cogs::FileContents>&& data)
183 {
184 TaskManager& tm = *context->taskManager;
187 [callback, dataPtr = data.release()]()
188 {
189 maybeDecompressAndInvokeHandler(callback, std::unique_ptr<Cogs::FileContents>(dataPtr));
190 });
191 }
192 else {
193 maybeDecompressAndInvokeHandler(callback, std::move(data));
194 }
195 };
196
197
198 // Try to pass this on to the fetchers
199 for (Item& item : fetchers) {
200
201 FetchId cancellationId = NoFetchId;
202 if (cancellable) {
203 cancellationId = (currentRequestId << fetcherIdBits) | item.id;
204 assert(cancellationId != NoFetchId); // id never matches the lower NoFetchId bit pattern
205 }
206
207 if (item.fetcher->fetchAsync(fileName, moveToResourceQueueHandler, offset, size, cancellationId, hints)) {
208 if (cancellable) {
209 currentRequestId++;
210 }
211 return cancellationId;
212 }
213 }
214
215#ifdef EMSCRIPTEN
216 IO::readFileAsync(fileName, moveToResourceQueueHandler, offset, size, hints);
217#else
218 // On desktop, we do syncronous file reads in a resource queue worker.
219 // Note that we pass the original callback since we already are in a resource worker
220 context->taskManager->enqueue(TaskManager::ResourceQueue,
221 [fileName, callback, hints, offset, size]()// mutable
222 {
223 maybeDecompressAndInvokeHandler(callback, IO::readFileSync(fileName, offset, size, hints));
224 });
225
226#endif
227 return NoFetchId;
228}
229
230Cogs::FileContents::Ptr Cogs::Core::DataFetcherManager::fetchSync(const std::string& fileName, uint64_t offset, uint64_t size, FileContentsHints hints)
231{
232 for (Item& item : fetchers) {
233 FileContents::Ptr contents = item.fetcher->fetchSync(fileName, offset, size, hints);
234
235 if (contents) {
236 return contents;
237 }
238 }
239 return IO::readFileSync(fileName, offset, size, hints);
240}
241
242void Cogs::Core::DataFetcherManager::cancelAsyncFetch(Context* /*context*/, FetchId id)
243{
244 if (id == NoFetchId) return;
245
246 for (Item& item : fetchers) {
247 if (item.id == (id & fetcherIdMask)) {
248 item.fetcher->cancelAsyncFetch(id);
249 return;
250 }
251 }
252}
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Definition: Context.h:186
Manages Task queuing and execution.
Definition: TaskManager.h:61
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
Definition: TaskManager.h:232
bool onMainThread() const
Returns true if called on the main thread.
Log implementation class.
Definition: LogManager.h:139
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180
FileContentsHints
Definition: FileContents.h:11
@ BrotliDecompress
A hint that the contents are Brotli (Google) compressed and is allowed to be decompressed during tran...
@ ZStdDecompress
A hint that the contents are Zstandard (Facebook) compressed and is allowed to be decompressed during...