Cogs.Core
DataFetcherManager.cpp
1#include <cassert>
2#include "zstd.h"
3#include <bit>
4#include <shared_mutex>
5#include <brotli/decode.h>
6
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/MemoryBufferBackedFileContents.h"
9#include "Context.h"
10#include "Services/TaskManager.h"
11
12#include "DataFetcherManager.h"
13
14#include "DataFetcherBase.h"
15
16namespace {
17 struct Item {
18 Cogs::Core::DataFetcherBase* fetcher = nullptr;
19 int priority = 0;
20 uint32_t id = 0;
21 };
22
23 std::shared_mutex fetcherLock;
24 std::vector<Item> fetchers;
25 Cogs::Logging::Log logger = Cogs::Logging::getLogger("DataFetcherManager");
26
27 uint32_t currentFetcherId = 0;
28 uint32_t currentRequestId = 0;
29 constexpr uint32_t fetcherIdBits = 7;
30 constexpr uint32_t fetcherIdMask = (1u << fetcherIdBits) - 1;
31}
32
33void Cogs::Core::DataFetcherManager::addDataFetcher(DataFetcherBase* dataFetcher, int priority)
34{
35 std::unique_lock guard(fetcherLock); // Needs exclusive access as it modifies the list
36 assert((fetchers.size() < 100) && "Manage the fetchers smarter");
37 for (const auto& fetcher : fetchers) {
38 if (fetcher.fetcher == dataFetcher) {
39 LOG_ERROR(logger, "addDataFetcher - ignoring duplicate DataFetcher");
40 return;
41 }
42 }
43
44 fetchers.push_back({});
45 // Insertion sort, no point in full blown introsort for just a few elements.
46 size_t i = fetchers.size() - 1;
47 while (0 < i && fetchers[i - 1].priority < priority) {
48 fetchers[i] = fetchers[i - 1];
49 i--;
50 }
51 fetchers[i].fetcher = dataFetcher;
52 fetchers[i].priority = priority;
53
54 if (currentFetcherId == (NoFetchId & fetcherIdMask)) { currentFetcherId = (currentFetcherId + 1) & fetcherIdMask; }
55 fetchers[i].id = currentFetcherId;
56 currentFetcherId = (currentFetcherId + 1) & fetcherIdMask;
57}
58
59void Cogs::Core::DataFetcherManager::removeDataFetcher(DataFetcherBase* dataFetcher) {
60 std::unique_lock guard(fetcherLock); // Needs exclusive access as it modifies the list
61 for (auto i = fetchers.begin(), e = fetchers.end(); i != e; ++i) {
62 if (i->fetcher == dataFetcher) {
63 fetchers.erase(i);
64 return;
65 }
66 }
67
68 LOG_ERROR(logger, "removeDataFetcher - unknown DataFetcher");
69}
70
71namespace {
72 using namespace Cogs::Core;
73
74 std::unique_ptr<Cogs::FileContents> brotliDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
75 {
76 BrotliDecoderState* decoder = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr); // No custom memory allocator
77 assert(decoder);
78
79 const uint8_t* nextIn = compressed->ptr;
80 size_t availableIn = compressed->size;
81 Cogs::Memory::MemoryBuffer decompressed;
82 while (!BrotliDecoderIsFinished(decoder)) {
83 size_t availableOut = 0;
84 BrotliDecoderResult rv = BrotliDecoderDecompressStream(decoder,
85 &availableIn,
86 &nextIn,
87 &availableOut,
88 nullptr, // next output buffer
89 nullptr); // output so far
90
91 if (rv == BROTLI_DECODER_RESULT_ERROR) {
92 LOG_ERROR(logger, "Parsing bin: BrotliDecoderDecompressStream failed: %s", BrotliDecoderErrorString(static_cast<BrotliDecoderErrorCode>(rv)));
93 BrotliDecoderDestroyInstance(decoder);
94 return std::unique_ptr<Cogs::FileContents>();
95 }
96
97 size_t outputSize = 0;
98 const uint8_t* outputBuffer = BrotliDecoderTakeOutput(decoder, &outputSize);
99 bool success = decompressed.write(outputBuffer, outputSize);
100 assert(success);
101 }
102 BrotliDecoderDestroyInstance(decoder);
103
104 LOG_TRACE(logger, "Brotli-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
105
106 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
107 compressed->origin().to_string(),
108 compressed->hints & ~Cogs::FileContentsHints::BrotliDecompress);
109 }
110
111 std::unique_ptr<Cogs::FileContents> zstdDecompress(std::unique_ptr<Cogs::FileContents>&& compressed)
112 {
113 ZSTD_DCtx* zstdctx = ZSTD_createDCtx();
114
115 unsigned long long outSize = ZSTD_getFrameContentSize(compressed->ptr, compressed->size);
116
117 if (outSize == ZSTD_CONTENTSIZE_UNKNOWN) {
118 LOG_ERROR(logger, "Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_UNKNOWN");
119 ZSTD_freeDCtx(zstdctx);
120 return std::unique_ptr<Cogs::FileContents>();
121 }
122 else if (outSize == ZSTD_CONTENTSIZE_ERROR) {
123 LOG_ERROR(logger, "Parsing bin: ZSTD_GetFrameContentSize returned ZSTD_CONTENTSIZE_ERROR");
124 ZSTD_freeDCtx(zstdctx);
125 return std::unique_ptr<Cogs::FileContents>();
126 }
127
128 Cogs::Memory::MemoryBuffer decompressed;
129 decompressed.resize(outSize);
130 size_t status = ZSTD_decompressDCtx(zstdctx,
131 decompressed.data(), decompressed.size(),
132 compressed->ptr, compressed->size);
133 ZSTD_freeDCtx(zstdctx);
134
135 if (ZSTD_isError(status)) {
136 LOG_ERROR(logger, "Parsing bin: ZSTD_decompressDCtx failed: %s", ZSTD_getErrorName(status));
137 return std::unique_ptr<Cogs::FileContents>();
138 }
139
140 LOG_TRACE(logger, "ZStd-decompressed %zu -> %zu bytes in transit", compressed->size, decompressed.size());
141
142 return std::make_unique<Cogs::MemoryBufferBackedFileContents>(std::move(decompressed),
143 compressed->origin().to_string(),
144 compressed->hints & ~Cogs::FileContentsHints::ZStdDecompress);
145 }
146
147 void maybeDecompressAndInvokeHandler(const Cogs::FileContents::Callback& callback, std::unique_ptr<Cogs::FileContents> && data)
148 {
149 if (!data) {
150 // No data was loaded. Just pass this along.
151 callback(std::move(data));
152 return;
153 }
154
155 // Check if the data should be decompressed in transit
157 callback(brotliDecompress(std::move(data)));
158 return;
159 }
160
162
163 // Since ZStd compressed files can be identified via a magic number at the beginning, we check
164 // for that before decompressing it. Hence we can add the FileContentsHints::ZStdDecompress
165 // speculatively and transparently just compress if it is actually compressed, something that
166 // is used for compressed asset hierarchies (see notes in Documentation/AssetSystem.md)
167 static_assert(std::endian::native == std::endian::little); // TODO: Handle case below in the unlikely event that cogs is compiled on a non little-endian architecture.
168 if (4 <= data->size && *reinterpret_cast<const uint32_t*>(data->ptr) == 0xFD2FB528) {
169 callback(zstdDecompress(std::move(data)));
170 return;
171 }
172 }
173
174 callback(std::move(data));
175 }
176
177}
178
179Cogs::Core::DataFetcherManager::FetchId
180Cogs::Core::DataFetcherManager::fetchAsync(Context* context, const std::string& fileName, const FileContents::Callback& callback, uint64_t offset, uint64_t size, bool cancellable, FileContentsHints hints)
181{
182 assert(context);
183
184 // Wrap the callback in our own callback that marshalls to the resource queue if we are on
185 // the main thread and tries to decompress the data if it is known to be compressed.
186 FileContents::Callback moveToResourceQueueHandler =
187 [context, callback](std::unique_ptr<Cogs::FileContents>&& data)
188 {
189 TaskManager& tm = *context->taskManager;
192 [callback, dataPtr = data.release()]()
193 {
194 maybeDecompressAndInvokeHandler(callback, std::unique_ptr<Cogs::FileContents>(dataPtr));
195 });
196 }
197 else {
198 maybeDecompressAndInvokeHandler(callback, std::move(data));
199 }
200 };
201
202
203 // Try to pass this on to the fetchers
204 {
205 std::shared_lock guard(fetcherLock); // Can use shared access since it only reads
206 for (Item& item : fetchers) {
207
208 FetchId cancellationId = NoFetchId;
209 if (cancellable) {
210 cancellationId = (currentRequestId << fetcherIdBits) | item.id;
211 assert(cancellationId != NoFetchId); // id never matches the lower NoFetchId bit pattern
212 }
213
214 if (item.fetcher->fetchAsync(fileName, moveToResourceQueueHandler, offset, size, cancellationId, hints)) {
215 if (cancellable) {
216 currentRequestId++;
217 }
218 return cancellationId;
219 }
220 }
221 }
222
223#ifdef EMSCRIPTEN
224 IO::readFileAsync(fileName, moveToResourceQueueHandler, offset, size, hints);
225#else
226 // On desktop, we do syncronous file reads in a resource queue worker.
227 // Note that we pass the original callback since we already are in a resource worker
228 context->taskManager->enqueue(TaskManager::ResourceQueue,
229 [fileName, callback, hints, offset, size]()// mutable
230 {
231 maybeDecompressAndInvokeHandler(callback, IO::readFileSync(fileName, offset, size, hints));
232 });
233
234#endif
235 return NoFetchId;
236}
237
238Cogs::FileContents::Ptr Cogs::Core::DataFetcherManager::fetchSync(const std::string& fileName, uint64_t offset, uint64_t size, FileContentsHints hints)
239{
240 for (Item& item : fetchers) {
241 FileContents::Ptr contents = item.fetcher->fetchSync(fileName, offset, size, hints);
242
243 if (contents) {
244 return contents;
245 }
246 }
247 return IO::readFileSync(fileName, offset, size, hints);
248}
249
250void Cogs::Core::DataFetcherManager::cancelAsyncFetch(Context* /*context*/, FetchId id)
251{
252 if (id == NoFetchId) return;
253
254 for (Item& item : fetchers) {
255 if (item.id == (id & fetcherIdMask)) {
256 item.fetcher->cancelAsyncFetch(id);
257 return;
258 }
259 }
260}
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Definition: Context.h:186
Manages Task queuing and execution.
Definition: TaskManager.h:61
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
Definition: TaskManager.h:232
bool onMainThread() const
Returns true if called on the main thread.
Log implementation class.
Definition: LogManager.h:140
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:181
FileContentsHints
Definition: FileContents.h:11
@ BrotliDecompress
A hint that the contents are Brotli (Google) compressed and is allowed to be decompressed during tran...
@ ZStdDecompress
A hint that the contents are Zstandard (Facebook) compressed and is allowed to be decompressed during...