Cogs.Core
OctProviderSystem.cpp
1#include <thread>
2#include "Context.h"
3#include "ExtensionRegistry.h"
4#include "Services/TaskManager.h"
5#include "Components/Geometry/MarkerPointSetComponent.h"
6#include "Components/Core/SceneComponent.h"
7
8#include "../Components/DataRefComponent.h"
9#include "../Components/BeamGroupComponent.h"
10#include "../Tasks/dBToLinearTask.h"
11#include "../Tasks/OctProviderBuildTileTask.h"
12
13#include "OctProviderSystem.h"
14#include "DataSetSystem.h"
15#include "../BeamUtils.h"
16#include "../../../Volumetric/Source/Components/OctComponent.h"
17
18#include "Foundation/Logging/Logger.h"
19
20namespace {
21 using namespace Cogs::Core;
22
23 Cogs::Logging::Log logger = Cogs::Logging::getLogger("EchoOctProvider");
24
25 struct OctDummyTask
26 {
27 Context* context;
28 std::shared_ptr<EchoSounder::OctProviderPersistent> persistent;
29
30 void operator()()
31 {
32 if (!persistent->needsLinearization.empty()) {
33 auto linearizeGroup = context->taskManager->createGroup(Cogs::Core::TaskManager::ResourceQueue);
34
35 for (auto * item : persistent->needsLinearization) {
36 assert(item->linear.size() == item->decibel.size());
38 task.overflowThreshold = std::numeric_limits<float>::quiet_NaN();
39 task.in = item->decibel.data();
40 task.out = item->linear.data();
41 task.N = item->linear.size();
42 context->taskManager->enqueueChild(linearizeGroup, task);
43 item->linearized = true;
44 }
45 context->taskManager->wait(linearizeGroup);
46 }
47
48 // Sanity checks.
49 for (auto & it : persistent->pingCache) {
50 assert(it.second->linearized);
51 }
52 persistent->needsLinearization.clear();
53
54 auto group = context->taskManager->createGroup(Cogs::Core::TaskManager::ResourceQueue);
55
56 for (size_t i = 0; i < persistent->responses.size(); i++) {
57
58 if (persistent->responses[i]->regionKeys.empty()) continue;
59
60 context->taskManager->enqueueChild(group, EchoSounder::OctProviderBuildTileTask{ persistent, i });
61 }
62 context->taskManager->wait(group);
63 persistent->tasksRunning = false;
64 }
65
66 };
67
68
69 void populatePingCache(EchoSounder::DataSetSystem* dataSystem, EchoSounder::OctProviderData& providerData)
70 {
71 auto * persistent = providerData.persistent.get();
72 persistent->needsLinearization.clear();
73
74 // If discarded, discard cached slices.
75 if (persistent->tainted) {
76 for (auto & it : persistent->pingCache) {
77 persistent->pingCacheUnused.push_back(std::move(it.second));
78 }
79 persistent->pingCache.clear();
80 }
81
82 if(persistent->responses.empty()) return;
83
84 // Update group cache which stores relevant group info while task is running.
85
86 persistent->groupCache.resize(providerData.sources.size());
87 for (size_t s = 0; s < providerData.sources.size(); s++) {
88 auto source = providerData.sources[s];
89 const auto * dataEntity = source.entity->getComponent<EchoSounder::DataRefComponent>()->data.get();
90 const auto * grpComp = source.entity->getComponent<EchoSounder::BeamGroupComponent>();
91 assert(dataEntity != nullptr);
92
93 persistent->groupCache[s].dataId = uint32_t(dataEntity->getId());
94 persistent->groupCache[s].majorCount = grpComp->majorCount;
95 persistent->groupCache[s].minorCount = grpComp->minorCount;
96 persistent->groupCache[s].directionX = grpComp->directionX;
97 persistent->groupCache[s].directionY = grpComp->directionY;
98 persistent->groupCache[s].minDirectionX = grpComp->minDirectionX;
99 persistent->groupCache[s].maxDirectionX = grpComp->maxDirectionX;
100 persistent->groupCache[s].minDirectionY = grpComp->minDirectionY;
101 persistent->groupCache[s].maxDirectionY = grpComp->maxDirectionY;
102 persistent->groupCache[s].maxBeamWidthX = grpComp->maxBeamWidthX;
103 persistent->groupCache[s].maxBeamWidthY = grpComp->maxBeamWidthY;
104 }
105
106
107 // Let slices age.
108 for (auto & it : persistent->pingCache) {
109 it.second->age++;
110 }
111
112 // Pull in data not present in the cache, reset age of stuff already present.
113 unsigned newSlices = 0;
114 unsigned recycledSlices = 0;
115
116
117 for (auto * resp : persistent->responses) {
118 for (auto & regionKey : resp->regionKeys) {
119 uint32_t sourceIndex, pingIndex;
120 EchoSounder::decodeKey2(sourceIndex, pingIndex, regionKey);
121 assert(sourceIndex < providerData.sources.size());
122
123 auto source = providerData.sources[sourceIndex];
124 auto * dataEntity = source.entity->getComponent<EchoSounder::DataRefComponent>()->data.get();
125
126 uint64_t cacheKey = EchoSounder::encodeKey2(persistent->groupCache[sourceIndex].dataId, pingIndex);
127 auto it = persistent->pingCache.find(cacheKey);
128 if (it != persistent->pingCache.end()) {
129 it->second->age = 0;
130 }
131 else {
132 std::unique_ptr<EchoSounder::CachedPing> entry;
133 if (!persistent->pingCacheUnused.empty()) {
134 entry = std::move(persistent->pingCacheUnused.back());
135 persistent->pingCacheUnused.pop_back();
136 recycledSlices++;
137 }
138 else {
139 entry = std::make_unique<EchoSounder::CachedPing>();
140 }
141
142 const auto & config = dataSystem->getData(dataEntity->getComponent<EchoSounder::DataSetComponent>()).persistent->config;
143 const auto & buffer = dataSystem->getData(dataEntity->getComponent<EchoSounder::DataSetComponent>()).persistent->buffer;
144 const auto & metaPing = buffer.metaPings[pingIndex];
145
146 entry->timestamp = buffer.timestamps[pingIndex];
147 entry->metaPing = metaPing;
148 entry->beamCount = config.beamCount;
149 entry->sampleCount = config.sampleCount;
150 entry->depthOffset = config.depthOffset;
151 entry->depthStep = config.depthStep;
152 entry->transducerAlpha = config.transducerAlpha;
153 entry->transducerOffset = config.transducerOffset;
154
155 entry->age = 0;
156 entry->linearized = false;
157 entry->decibel.resize(config.beamCount*config.sampleCount, false);
158 std::memcpy(entry->decibel.data(), buffer.values.data() + pingIndex * config.beamCount * config.sampleCount, sizeof(float)*config.beamCount*config.sampleCount);
159 entry->bottomDepths.resize(config.beamCount, false);
160 std::memcpy(entry->bottomDepths.data(), buffer.bottomDepths.data() + pingIndex * config.beamCount, sizeof(float)*config.beamCount);
161 entry->linear.resize(config.beamCount*config.sampleCount, false);
162 persistent->needsLinearization.push_back(entry.get());
163
164 persistent->pingCache[cacheKey] = std::move(entry);
165 newSlices++;
166 }
167 }
168 }
169
170 // Discard expired cache entries.
171 unsigned expiredSlices = 0;
172 for (auto it = persistent->pingCache.begin(); it != persistent->pingCache.end(); ) {
173 if constexpr (false && 10 < it->second->age) {
174 persistent->pingCacheUnused.push_back(std::move(it->second));
175 it = persistent->pingCache.erase(it); // Should be safe since C++-14.
176 expiredSlices++;
177 }
178 else {
179 ++it;
180 }
181 }
182
183 // Sanity checks.
184 for (auto & it : persistent->pingCache) {
185 if (!it.second->linearized) {
186 bool found = false;
187 for (auto jt : persistent->needsLinearization) {
188 if (jt == it.second.get()) { found = true; break; }
189 }
190 assert(found && "ping is neither already linearized nor is queued for linearization.");
191 }
192 }
193
194 if (newSlices || expiredSlices) {
195 //LOG_DEBUG(logger, "Updated slice cache, %d new slices (%d recycled) and %d expired.", newSlices, recycledSlices, expiredSlices);
196 }
197 }
198
199
200 void addAndRemoveRegions(Context* context, const EchoSounder::OctProviderComponent* providerComp, EchoSounder::OctProviderData& providerData)
201 {
202 assert(providerData.callbackCount == 0);
203
204 auto * dataSystem = ExtensionRegistry::getExtensionSystem<EchoSounder::DataSetSystem>(context);
205 auto * octComp = providerComp->getComponent<Volumetric::OctComponent>();
206
207 for (auto & src : providerData.sources) {
208 auto * grpComp = src.entity->getComponent<EchoSounder::BeamGroupComponent>();
209 if (!grpComp->sane) {
210 if (!src.sane) providerData.doFlush = true;
211 src.sane = false;
212 continue;
213 }
214 src.sane = true;
215
216 if (src.grpGen != grpComp->gen) {
217 src.grpGen = grpComp->gen;
218 providerData.doFlush = true;
219 }
220 }
221
222 if (providerData.doFlush) {
223 providerData.doFlush = false;
224 providerData.persistent->tainted = true;
225 for (auto & src : providerData.sources) {
226 auto * dataRefComp = src.entity->getComponent<EchoSounder::DataRefComponent>();
227 auto * dataComp = dataRefComp->data->getComponent<EchoSounder::DataSetComponent>();
228 auto & dataData = dataSystem->getData(dataComp);
229 src.dataGen = dataData.dataGen - 1;
230 src.sliceBegin = dataData.persistent->buffer.sliceBegin;
231 src.sliceCount = 0;
232 }
233
234 LOG_DEBUG(logger, "Performing flush");
235 octComp->clearAllRegions = true;
236 octComp->setChanged();
237 }
238
239 providerData.mostRecentTimestamp = 0;
240 for (size_t s = 0; s < providerData.sources.size(); s++) {
241 auto & src = providerData.sources[s];
242
243 if (!src.sane) continue;
244
245 auto * dataRefComp = src.entity->getComponent<EchoSounder::DataRefComponent>();
246 auto * dataComp = dataRefComp->data->getComponent<EchoSounder::DataSetComponent>();
247 auto & dataData = dataSystem->getData(dataComp);
248 if (src.dataGen == dataData.dataGen) continue;
249
250 const auto & config = dataData.persistent->config;
251 const auto & buffer = dataData.persistent->buffer;
252
253 if (0 < buffer.sliceCount) {
254 auto last = buffer.sliceBegin + buffer.sliceCount - 1;
255 last = std::min(last, last - config.capacity);
256 providerData.mostRecentTimestamp = std::max(providerData.mostRecentTimestamp,
257 uint64_t(buffer.timestamps[last]));
258 }
259
260 auto sliceCount = std::min(providerData.history, buffer.sliceCount);
261 auto sliceBegin = buffer.sliceBegin + buffer.sliceCount - sliceCount;
262 sliceBegin = std::min(sliceBegin, sliceBegin - config.capacity); // Wrap.
263
264 // Delete pings/regions
265 if (!providerData.doFlush) {
266 src.sliceBegin = std::min(src.sliceBegin, src.sliceBegin - config.capacity);
267 while (src.sliceBegin != sliceBegin) {
268 //LOG_DEBUG(logger, "%llx: Discarding slice at %d", (intptr_t)&src, src.sliceBegin);
269 octComp->regionsToRemove.push_back(EchoSounder::encodeKey2(uint32_t(s), src.sliceBegin));
270 octComp->setChanged();
271 src.sliceBegin = std::min(src.sliceBegin + 1, src.sliceBegin + 1 - config.capacity);
272 src.sliceCount = std::max(1u, src.sliceCount) - 1u;
273 }
274 }
275 src.sliceBegin = sliceBegin;
276
277 const auto * grpComp = src.entity->getComponent<EchoSounder::BeamGroupComponent>();
278
279 for (; src.sliceCount < sliceCount; src.sliceCount++) {
280 auto slice = src.sliceCount + src.sliceBegin;
281 slice = std::min(slice, slice - config.capacity);
282
283 const auto & metaPing = buffer.metaPings[slice];
284
285 glm::vec3 minCorner, maxCorner;
286 EchoSounder::getAxisAlignedBoundingBox(minCorner, maxCorner,
287 metaPing.arrayOrientationGlobal,
288 metaPing.arrayPositionGlobal,
289 0, // FIXME - Assume EK/ME/MS for now.
290 grpComp->minDirectionX, grpComp->maxDirectionX,
291 grpComp->minDirectionY, grpComp->maxDirectionY,
292 grpComp->minDistance, grpComp->maxDistance);
293 //LOG_DEBUG(logger, "%llx: Adding slice at %d", (intptr_t)&src, slice);
294 octComp->regionsToAdd.push_back(Volumetric::Region{ EchoSounder::encodeKey2(uint32_t(s), slice), minCorner, maxCorner });
295 octComp->setChanged();
296 }
297 }
298 }
299
300
301
302}
303
304
306{
307 //const unsigned L = 300;
308
309 // Update set of tiles before OctSystem:Update(@PostView) runs.
310 for (auto & providerComp : pool) {
311 auto & providerData = getData(&providerComp);
312 if (!providerData.persistent) {
313 providerData.persistent = std::make_shared<OctProviderPersistent>();
314 providerData.persistent->context = context;
315 }
316
317 if (providerData.history != providerComp.history) {
318 providerData.history = providerComp.history;
319 providerData.doFlush = true;
320 }
321
322 auto * octComp = providerComp.getComponent<Volumetric::OctComponent>();
323 octComp->valueMin = dBToLinear(providerComp.decibelMin);
324 octComp->valueMax = dBToLinear(providerComp.decibelMax);
325 octComp->source = Volumetric::OctSource::ValueAge;
326 octComp->alphaCallback.func = alphaCallback;
327 octComp->alphaCallback.data = &providerData;
328
329 const auto * sceneComp = providerComp.getComponent<SceneComponent>();
330 if (sceneComp->hasFieldChanged(&SceneComponent::children)) {
331 LOG_DEBUG(logger, "Repopulating list of providers");
332 providerData.sources.clear();
333 providerData.persistent->tainted = true;
334 for (auto child : sceneComp->children) {
335 if (child->getComponent<DataRefComponent>() == nullptr) continue;
336 if (child->getComponent<BeamGroupComponent>() == nullptr) continue;
337 providerData.sources.emplace_back(OctProviderSource{ child });
338 }
339 providerData.doFlush = true;
340 }
341
342 for (auto & source : providerData.sources) {
343 source.entity->getComponent<BeamGroupComponent>()->requestCallback([that = this, comp = &providerComp](Context* context) { that->callback(context, comp); });
344 }
345 providerData.callbackCount = static_cast<unsigned>(providerData.sources.size());
346 }
347}
348
349void Cogs::Core::EchoSounder::OctProviderSystem::callback(Context* context, OctProviderComponent * providerComp)
350{
351 auto & providerData = getData(providerComp);
352 assert(providerData.callbackCount != 0);
353 if (--providerData.callbackCount == 0) {
354 // Callback of last source, update regions.
355 addAndRemoveRegions(context, providerComp, providerData);
356 }
357}
358
359float Cogs::Core::EchoSounder::OctProviderSystem::alphaCallback(void* data, uint64_t clientData)
360{
361 const auto * providerData = reinterpret_cast<OctProviderData*>(data);
362 return float(10e-7*(providerData->mostRecentTimestamp - clientData));
363}
364
366{
367 auto * dataSystem = ExtensionRegistry::getExtensionSystem<DataSetSystem>(context);
368
369 for (auto & providerComp : pool) {
370 auto & providerData = getData(&providerComp);
371 auto * octComp = providerComp.getComponent<Volumetric::OctComponent>();
372
373 // If a tile populate task is running, we keep our hands off.
374 if(providerData.persistent->tasksRunning) continue;
375
376 auto * persistent = providerData.persistent.get();
377 auto * submitTileResponse = Reflection::TypeDatabase::getType("VolOctComponent").getMethod("submitTileResponse");
378
379 // No task running, forward results to OctSystem.
380 while (!providerData.persistent->responses.empty()) {
381 submitTileResponse->call(octComp, std::move(persistent->responses.back()), persistent->tainted == false);
382 providerData.persistent->responses.pop_back();
383 }
384
385 // OctSystem:Update(@PostView) has run, pull new requests and populate with data.
386 auto * initTileResponse = Reflection::TypeDatabase::getType("VolOctComponent").getMethod("initTileResponse");
387
388 assert(providerData.persistent->responses.empty());
389 providerData.persistent->taskCount.clear();
390
391 unsigned regionsPerTask = 1;
392 unsigned targetParallelism = 2*std::max(1u, std::thread::hardware_concurrency());
393 unsigned currentParallelism = 0;
394 for (const auto reqTileKey : octComp->tileRequests) {
395 if (targetParallelism < currentParallelism) break;
396
398 res = initTileResponse->callWithReturn(octComp, res, &reqTileKey);
399
400 if (!res) break;
401 if (res->regionKeys.empty()) {
402 LOG_WARNING(logger, "Tile request with no regions.");
403 }
404
405 unsigned maxTasks = std::max(1u, std::min(targetParallelism, res->N.z));
406
407 unsigned taskCount = std::max(1u, std::min(maxTasks,
408 ((unsigned)res->regionKeys.size() + regionsPerTask - 1u) / regionsPerTask));
409
410 providerData.persistent->responses.push_back(std::move(res));
411 providerData.persistent->taskCount.push_back(taskCount);
412 currentParallelism += taskCount;
413 }
414
415 populatePingCache(dataSystem, providerData);
416
417 persistent->tainted = false;
418 persistent->tasksRunning = true;
419 context->taskManager->enqueue(Cogs::Core::TaskManager::ResourceQueue, OctDummyTask{ context, providerData.persistent });
420 }
421}
ComponentType * getComponent() const
Definition: Component.h:159
Context * context
Pointer to the Context instance the system lives in.
void update()
Updates the system state to that of the current frame.
void preUpdate()
Run the pre-update method of the system.
ComponentPool< ComponentType > pool
Pool of components managed by the system.
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Definition: Context.h:186
Contains information on how the entity behaves in the scene.
std::vector< EntityPtr > children
Contains all child entities owned by this component.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
Definition: TaskManager.h:232
Log implementation class.
Definition: LogManager.h:139
void call(Class *object, Arg... arg) const
Call the method named name on the given object, with the given arguments.
Definition: Method.h:144
static const Type & getType()
Get the Type of the given template argument.
Definition: TypeDatabase.h:168
const Method * getMethod(const Name &name) const
Get a pointer to the method with the given name.
Definition: Type.cpp:133
Contains the Engine, Renderer, resource managers and other systems needed to run Cogs....
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180
glm::uvec3 N
Number of samples expected along each dimension (total=N^3).
Definition: OctComponent.h:48
std::set< RegionKey > regionKeys
Regions present in that tile.
Definition: OctComponent.h:49