Cogs.Core
DataSetSystem.cpp
1#include "DataSetSystem.h"
2
3#include "Context.h"
4#include "Services/Variables.h"
5#include "../BeamUtils.h"
6#include "Context.h"
7#include "Services/TaskManager.h"
8#include "Platform/Instrumentation.h"
9
10#include "Foundation/Logging/Logger.h"
11#include "Foundation/Platform/Timer.h"
12#include "Foundation/Platform/Threads.h"
13
14#include <glm/gtx/component_wise.hpp>
15
16#include <algorithm>
17
18using namespace Cogs::Core::EchoSounder;
19
20namespace
21{
22 Cogs::Logging::Log logger = Cogs::Logging::getLogger("MultibeamDataSetSystem");
23
24 bool equals(const std::vector<float>& A, const std::vector<float>& B, float epsilon)
25 {
26 if (A.size() != B.size()) return false;
27 for (size_t i = 0; i < A.size(); i++) {
28 if (epsilon < std::abs(A[i] - B[i])) return false;
29 }
30 return true;
31 }
32
33 struct ResampleBufferWorkerTask
34 {
35 std::shared_ptr<DataSetPersistent> persistent;
36 uint32_t A, B;
37
38 ResampleBufferWorkerTask(std::shared_ptr<DataSetPersistent> persistent, uint32_t A, uint32_t B)
39 : persistent(persistent), A(A), B(B)
40 {}
41
42 void operator()()
43 {
44 CpuInstrumentationScope(SCOPE_SYSTEMS, "ResampleBufferWorker");
45
46 const auto & configNew = persistent->configNew;
47 const auto & configCur = persistent->config;
48 auto & bufferNew = persistent->bufferNew;
49 auto & bufferCur = persistent->buffer;
50 const auto padding = configCur.useDecibel ? -std::numeric_limits<float>::infinity() : 0.0f;
51 const auto beamCount = configNew.beamCount;
52 const auto valuesStrideCur = configCur.sampleCount*beamCount;
53 const auto valuesStrideNew = configNew.sampleCount*beamCount;
54
55 for (uint32_t kNew = A; kNew < B; kNew++)
56 {
57 uint32_t kCur = (bufferCur.sliceBegin + kNew);
58 if (configCur.capacity <= kCur) kCur -= configCur.capacity;
59
60 size_t valuesOffCur = kCur*valuesStrideCur;
61 size_t valuesOffNew = kNew*valuesStrideNew;
62
63 for (uint32_t j = 0; j < beamCount; j++) {
64 for (uint32_t iNew = 0; iNew < configNew.sampleCount; iNew++) {
65 auto iCur = static_cast<uint32_t>((configNew.depthOffset - configCur.depthOffset + iNew*configNew.depthStep) / configCur.depthStep);
66 float value = padding;
67 if (iCur < configCur.sampleCount) {
68 value = bufferCur.values[valuesOffCur + iCur];
69 }
70 bufferNew.values[valuesOffNew + iNew] = value;
71 }
72
73 valuesOffCur += configCur.sampleCount;
74 valuesOffNew += configNew.sampleCount;
75 }
76 }
77
78 uint32_t ACur = bufferCur.sliceBegin + A;
79 if (configCur.capacity <= ACur) ACur -= configCur.capacity;
80
81
82
83 std::memcpy(bufferNew.bottomDepths.data() + A*beamCount,
84 bufferCur.bottomDepths.data() + ACur*beamCount,
85 sizeof(float)*(B - A)*beamCount);
86
87 std::memcpy(bufferNew.bottomReflectivities.data() + A*beamCount,
88 bufferCur.bottomReflectivities.data() + ACur*beamCount,
89 sizeof(float)*(B - A)*beamCount);
90
91 std::memcpy(bufferNew.timestamps.data() + A,
92 bufferCur.timestamps.data() + ACur,
93 sizeof(int64_t)*(B - A));
94
95 std::memcpy(bufferNew.metaPings.data() + A,
96 bufferCur.metaPings.data() + ACur,
97 sizeof(PingMetaData)*(B - A));
98
99 std::memcpy(bufferNew.distances.data() + A,
100 bufferCur.distances.data() + ACur,
101 sizeof(float)*(B - A));
102 }
103 };
104
105 struct ResampleBufferMainTask
106 {
107 Cogs::Core::Context* context;
108 std::shared_ptr<DataSetPersistent> persistent;
109
110 ResampleBufferMainTask(Cogs::Core::Context* context, std::shared_ptr<DataSetPersistent> persistent)
111 : context(context), persistent(persistent) {}
112
113 void operator()()
114 {
115 CpuInstrumentationScope(SCOPE_ECHOSOUNDER, "ResampleBufferMain");
116 auto timer = Cogs::Timer::startNew();
117 const auto & configNew = persistent->configNew;
118 const auto & configCur = persistent->config;
119 auto & bufferNew = persistent->bufferNew;
120 auto & bufferCur = persistent->buffer;
121
122
123 const auto Q = configNew.capacity;
124 const auto M = configNew.beamCount;
125 const auto N = configNew.sampleCount;
126 const auto keepSlices = std::min(configNew.capacity - 1, bufferCur.sliceCount);
127 const auto keepBegin = (bufferCur.sliceBegin + bufferCur.sliceCount - keepSlices) % configCur.capacity;
128 const auto concurrency = std::max(1u, (uint32_t)Cogs::Threads::hardwareConcurrency());
129 const auto slicesPerTask = std::max(10u, keepSlices / concurrency);
130
131 bufferNew.sliceBegin = 0;
132 bufferNew.sliceCount = keepSlices;
133 bufferNew.timestamps.resize(Q, false);
134 bufferNew.values.resize(Q * M * N, false);
135 bufferNew.bottomDepths.resize(Q * M, false);
136 bufferNew.bottomReflectivities.resize(Q * M, false);
137 bufferNew.metaPings.resize(Q, false);
138 bufferNew.distances.resize(Q, false);
139
140 auto tasks = 0u;
141 if (0 < keepSlices) {
142 auto group = context->taskManager->createGroup();
143 if (keepBegin + keepSlices <= configCur.capacity)
144 {
145 for (uint32_t s = 0; s < keepSlices; s += slicesPerTask)
146 {
147 context->taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(keepSlices, s + slicesPerTask)));
148 tasks++;
149 }
150 }
151 else {
152 auto split = configCur.capacity - keepBegin;
153 assert(split <= keepSlices);
154 for (uint32_t s = 0; s < split; s += slicesPerTask)
155 {
156 context->taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(split, s + slicesPerTask)));
157 tasks++;
158 }
159 for (uint32_t s = split; s < keepSlices; s += slicesPerTask)
160 {
161 context->taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(keepSlices, s + slicesPerTask)));
162 tasks++;
163 }
164 }
165 context->taskManager->wait(group);
166 bufferNew.distances[keepSlices - 1] = 0;
167 }
168 LOG_DEBUG(logger, "ResampleBuffer keep=%d, stride=%d, tasks=%d, elapsed=%.2fms", keepSlices, slicesPerTask, tasks, 1000.0*timer.elapsedSeconds());
169
170 persistent->dataReady = true;
171 persistent->runningTasks--;
172 }
173
174 };
175
176}
177
178void Cogs::Core::EchoSounder::DataSetSystem::updateGridAngles(DataSetData& /*dataData*/)
179{
180 // Separate minor and major angles under the assumption that a "slice" of beams
181 // lies has constant corresponding angle.
182#if 0
183 const auto& config = dataData.persistent->config;
184 const auto minorCount = config.minorCount;
185 const auto majorCount = config.majorCount;
186
187 const auto & fullMinorAngles = config.directionY;
188 const auto & fullMajorAngles = config.directionX;
189
190 auto & separatedMinorAngles = dataData.directionY;
191 auto & separatedMajorAngles = dataData.directionX;
192
193 const auto iN = minorCount;
194 const auto jN = majorCount;
195
196 separatedMinorAngles.clear();
197 separatedMinorAngles.resize(minorCount);
198 for (uint32_t i = 0; i < minorCount; i++) {
199 float sumAngle = 0.f;
200 for (uint32_t j = 0; j < majorCount; j++) {
201 sumAngle += fullMinorAngles[minorCount*j + i];
202 }
203 separatedMinorAngles[i] = sumAngle / majorCount;
204 }
205
206 separatedMajorAngles.clear();
207 separatedMajorAngles.resize(majorCount);
208 for (uint32_t j = 0; j < majorCount; j++) {
209 float sumAngle = 0.f;
210 for (uint32_t i = 0; i < minorCount; i++) {
211 sumAngle += fullMajorAngles[minorCount*j + i];
212 }
213 separatedMajorAngles[j] = sumAngle / minorCount;
214 }
215 int a = 2;
216#endif
217}
218
219
220bool Cogs::Core::EchoSounder::DataSetSystem::handleConfigTransition(Context* context, DataSetData& dataData)
221{
222 dataData.persistent->configValid = false;
223 const auto & configNew = dataData.persistent->configNew;
224
225 if (configNew.capacity < 1) return false;
226 if (configNew.beamCount < 1) return false;
227 if (configNew.sampleCount < 2) return false;
228
229 const auto & configCurr = dataData.persistent->config;
230 if ((configCurr.beamCount != configNew.beamCount) ||
231 (configCurr.coordSys != configNew.coordSys) ||
232 (std::numeric_limits<float>::epsilon() < glm::compMax(glm::abs(configCurr.transducerAlpha - configNew.transducerAlpha))) ||
233 (std::numeric_limits<float>::epsilon() < glm::compMax(glm::abs(configCurr.transducerOffset - configNew.transducerOffset))) ||
234 (configCurr.useDecibel != configNew.useDecibel) ||
235 !equals(configCurr.directionX, configNew.directionX, std::numeric_limits<float>::epsilon()) ||
236 !equals(configCurr.directionY, configNew.directionY, std::numeric_limits<float>::epsilon()) ||
237 !equals(configCurr.beamWidthX, configNew.beamWidthX, std::numeric_limits<float>::epsilon()) ||
238 !equals(configCurr.beamWidthY, configNew.beamWidthY, std::numeric_limits<float>::epsilon()))
239 {
240 resizeBuffers(dataData);
241 return false;
242 }
243 if ((configCurr.capacity == configNew.capacity) &&
244 (configCurr.sampleCount == configNew.sampleCount) &&
245 (std::abs(configCurr.depthOffset - configNew.depthOffset) < std::numeric_limits<float>::epsilon()) &&
246 (std::abs(configCurr.depthStep - configNew.depthStep) < std::numeric_limits<float>::epsilon()))
247 {
248 dataData.persistent->configValid = true;
249 return false; // Basically identical configs, do nothing.
250 }
251 if (configNew.tryPreserve) {
252 dataData.persistent->runningTasks++;
253 dataData.persistent->configValid = true;
254 context->taskManager->enqueue(context->taskManager->GlobalQueue, ResampleBufferMainTask(context, dataData.persistent));
255 return true;
256 }
257 else {
258 resizeBuffers(dataData);
259 return false;
260 }
261}
262
263
264
265void Cogs::Core::EchoSounder::DataSetSystem::resizeBuffers(DataSetData& dataData)
266{
267 const auto & configNew = dataData.persistent->configNew;
268 dataData.persistent->config = configNew;
269 dataData.persistent->configValid = true;
270 dataData.configGen++;
271
272 const auto Q = dataData.persistent->config.capacity;
273 const auto M = dataData.persistent->config.beamCount;
274 const auto N = dataData.persistent->config.sampleCount;
275
276 dataData.persistent->buffer.sliceBegin = 0;
277 dataData.persistent->buffer.sliceCount = 0;
278 dataData.persistent->buffer.timestamps.resize(Q, false);
279 dataData.persistent->buffer.values.resize(Q * M * N, false);
280 dataData.persistent->buffer.bottomDepths.resize(Q * M, false);
281 dataData.persistent->buffer.bottomReflectivities.resize(Q * M, false);
282 dataData.persistent->buffer.metaPings.resize(Q, false);
283 dataData.persistent->buffer.distances.resize(Q, false);
284
285 updateGridAngles(dataData);
286 dataData.slicePreservedBegin = 0;
287 dataData.slicePreservedCount = 0;
288 dataData.configGen++;
289 dataData.dataGen++;
290
291}
292
293void Cogs::Core::EchoSounder::DataSetSystem::handleNewPing(DataSetData& dataData, Ping& ping)
294{
295 if (!dataData.persistent->configValid) return;
296
297 const auto & config = dataData.persistent->config;
298
299 if ((ping.beamCount != config.beamCount) ||
300 (ping.sampleCount != config.sampleCount))
301 {
302 LOG_WARNING(logger, "handlePing: numBeams/numSamples mismatch, ignoring slice.");
303 return;
304 }
305
306 auto & buf = dataData.persistent->buffer;
307 if (0 < buf.sliceCount) {
308 auto prevSlice = (buf.sliceBegin + buf.sliceCount - 1u) % config.capacity;
309 if (ping.timestamp <= buf.timestamps[prevSlice]) {
310 LOG_WARNING(logger, "handlePing: Detected non-monotonic increasing timestamps, ignoring slice.");
311 return;
312 }
313 }
314
315 if (buf.sliceCount == config.capacity) { // Make room for a new slice
316 if (dataData.slicePreservedBegin == buf.sliceBegin) {
317 dataData.slicePreservedBegin = (dataData.slicePreservedBegin + 1u) % config.capacity;
318 dataData.slicePreservedCount = std::max(1u, dataData.slicePreservedCount) - 1u;
319 }
320 buf.sliceBegin = (buf.sliceBegin + 1u) % config.capacity;
321 buf.sliceCount = std::max(1u, buf.sliceCount) - 1u;
322 }
323
324 int slice = (buf.sliceBegin + buf.sliceCount) % config.capacity;
325
326 // --- Copy timestamp, values, bottomDepth, bottomReflectivites.
327 const auto M = config.beamCount;
328 const auto N = config.sampleCount;
329
330 buf.timestamps[slice] = ping.timestamp;
331
332 std::memcpy(buf.values.data() + M*N*slice, ping.values(), ping.valuesSize());
333
334 // Copy bottom depths and replace non-finites with zero (which is no-data marker for depths).
335 {
336 const auto * s = ping.bottomDepths();
337 auto * d = buf.bottomDepths.data() + M * slice;
338 for (size_t i = 0; i < M; i++) {
339 d[i] = std::isfinite(s[i]) ? s[i] : 0.f;
340 }
341 }
342
343 std::memcpy(buf.bottomReflectivities.data() + M*slice, ping.bottomReflectivities(), ping.bottomReflectivitiesSize());
344
345
346
347 // --- Update metaPings
348 auto & metaPing = buf.metaPings[slice];
349 metaPing.vesselOrientationGlobal = ping.orientation;
350 metaPing.vesselPositionGlobal = ping.position;
351 getArrayToVesselTransform(buf.metaPings[slice].arrayOrientationVessel,
352 buf.metaPings[slice].arrayPositionVessel,
353 config.transducerAlpha,
354 config.transducerOffset);
355 buf.metaPings[slice].arrayOrientationGlobal = metaPing.vesselOrientationGlobal * buf.metaPings[slice].arrayOrientationVessel;
356 buf.metaPings[slice].arrayPositionGlobal = metaPing.vesselPositionGlobal + metaPing.vesselOrientationGlobal * buf.metaPings[slice].arrayPositionVessel;
357
358 // --- Update distances
359 if (0 < buf.sliceCount) {
360 auto minSliceDistance = std::max(0.001f, context->variables->get("echo.minSliceDistance")->getFloat());
361
362 auto prevSlice = (buf.sliceBegin + buf.sliceCount - 1u) % config.capacity;
363 buf.distances[prevSlice] = std::max(minSliceDistance,
364 glm::distance(buf.metaPings[prevSlice].vesselPositionGlobal,
365 buf.metaPings[slice].vesselPositionGlobal));
366 }
367 buf.distances[slice] = 0;
368 buf.sliceCount++;
369 dataData.dataGen++;
370}
371
373{
374 for (auto & dataSetComp : pool) {
375 auto & dataData = getData(&dataSetComp);
376
377 if (dataData.persistent->runningTasks != 0) {
378 continue; // Re-sample job running, hands off.
379 }
380
381 if (dataData.persistent->dataReady) {
382 dataData.persistent->dataReady = false;
383
384 dataData.persistent->config = dataData.persistent->configNew;
385 dataData.persistent->buffer.sliceBegin = dataData.persistent->bufferNew.sliceBegin;
386 dataData.persistent->buffer.sliceCount = dataData.persistent->bufferNew.sliceCount;
387 dataData.persistent->buffer.timestamps.swap(dataData.persistent->bufferNew.timestamps);
388 dataData.persistent->buffer.values.swap(dataData.persistent->bufferNew.values);
389 dataData.persistent->buffer.bottomDepths.swap(dataData.persistent->bufferNew.bottomDepths);
390 dataData.persistent->buffer.bottomReflectivities.swap(dataData.persistent->bufferNew.bottomReflectivities);
391 dataData.persistent->buffer.metaPings.swap(dataData.persistent->bufferNew.metaPings);
392 dataData.persistent->buffer.distances.swap(dataData.persistent->bufferNew.distances);
393
394 dataData.persistent->bufferNew.timestamps.clear();
395 dataData.persistent->bufferNew.values.clear();
396 dataData.persistent->bufferNew.bottomDepths.clear();
397 dataData.persistent->bufferNew.bottomReflectivities.clear();
398 dataData.persistent->bufferNew.metaPings.clear();
399 dataData.persistent->bufferNew.distances.clear();
400
401 updateGridAngles(dataData);
402 dataData.slicePreservedBegin = 0;
403 dataData.slicePreservedCount = 0;
404 dataData.configGen++;
405 dataData.dataGen++;
406 }
407
408 while (!dataSetComp.messages.empty()) {
409 dataSetComp.setChangedTransient();
410
411 auto * baseMessage = dataSetComp.messages.front();
412
413 auto * clearMessage = dynamic_cast<ClearMessage*>(baseMessage);
414 if (clearMessage) {
415 dataData.persistent->buffer.sliceBegin = 0;
416 dataData.persistent->buffer.sliceCount = 0;
417 dataData.slicePreservedBegin = 0;
418 dataData.slicePreservedCount = 0;
419 dataData.dataGen++;
420 dataData.clearGen++;
421 }
422
423 auto * configMessage = dynamic_cast<ConfigMessage*>(baseMessage);
424 if (configMessage) {
425 dataData.persistent->configNew = configMessage->config;
426 dataData.persistent->configReady = true;
427 }
428
429 auto * pingMessage = dynamic_cast<PingMessage*>(baseMessage);
430 if (pingMessage) {
431 if (dataData.persistent->configReady) {
432 dataData.persistent->configReady = false;
433
434 if (handleConfigTransition(context, dataData)) {
435 break; // Resample job has started, hands off.
436 }
437 }
438
439 handleNewPing(dataData, pingMessage->ping);
440 }
441
442 dataSetComp.messages.pop_front();
443 delete baseMessage;
444 }
445
446 }
447}
void update()
Updates the system state to that of the current frame.
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Definition: Context.h:186
std::unique_ptr< class Variables > variables
Variables service instance.
Definition: Context.h:180
Log implementation class.
Definition: LogManager.h:139
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180