1#include "DataSetSystem.h"
4#include "Services/Variables.h"
5#include "../BeamUtils.h"
7#include "Services/TaskManager.h"
8#include "Platform/Instrumentation.h"
10#include "Foundation/Logging/Logger.h"
11#include "Foundation/Platform/Timer.h"
12#include "Foundation/Platform/Threads.h"
14#include <glm/gtx/component_wise.hpp>
18using namespace Cogs::Core::EchoSounder;
24 bool equals(
const std::vector<float>& A,
const std::vector<float>& B,
float epsilon)
26 if (A.size() != B.size())
return false;
27 for (
size_t i = 0; i < A.size(); i++) {
28 if (epsilon < std::abs(A[i] - B[i]))
return false;
33 struct ResampleBufferWorkerTask
35 std::shared_ptr<DataSetPersistent> persistent;
38 ResampleBufferWorkerTask(std::shared_ptr<DataSetPersistent> persistent, uint32_t A, uint32_t B)
39 : persistent(persistent), A(A), B(B)
44 CpuInstrumentationScope(SCOPE_SYSTEMS,
"ResampleBufferWorker");
46 const auto & configNew = persistent->configNew;
47 const auto & configCur = persistent->config;
48 auto & bufferNew = persistent->bufferNew;
49 auto & bufferCur = persistent->buffer;
50 const auto padding = configCur.useDecibel ? -std::numeric_limits<float>::infinity() : 0.0f;
51 const auto beamCount = configNew.beamCount;
52 const auto valuesStrideCur = configCur.sampleCount*beamCount;
53 const auto valuesStrideNew = configNew.sampleCount*beamCount;
55 for (uint32_t kNew = A; kNew < B; kNew++)
57 uint32_t kCur = (bufferCur.sliceBegin + kNew);
58 if (configCur.capacity <= kCur) kCur -= configCur.capacity;
60 size_t valuesOffCur = kCur*valuesStrideCur;
61 size_t valuesOffNew = kNew*valuesStrideNew;
63 for (uint32_t j = 0; j < beamCount; j++) {
64 for (uint32_t iNew = 0; iNew < configNew.sampleCount; iNew++) {
65 auto iCur =
static_cast<uint32_t
>((configNew.depthOffset - configCur.depthOffset + iNew*configNew.depthStep) / configCur.depthStep);
66 float value = padding;
67 if (iCur < configCur.sampleCount) {
68 value = bufferCur.values[valuesOffCur + iCur];
70 bufferNew.values[valuesOffNew + iNew] = value;
73 valuesOffCur += configCur.sampleCount;
74 valuesOffNew += configNew.sampleCount;
78 uint32_t ACur = bufferCur.sliceBegin + A;
79 if (configCur.capacity <= ACur) ACur -= configCur.capacity;
83 std::memcpy(bufferNew.bottomDepths.data() + A*beamCount,
84 bufferCur.bottomDepths.data() + ACur*beamCount,
85 sizeof(
float)*(B - A)*beamCount);
87 std::memcpy(bufferNew.bottomReflectivities.data() + A*beamCount,
88 bufferCur.bottomReflectivities.data() + ACur*beamCount,
89 sizeof(
float)*(B - A)*beamCount);
91 std::memcpy(bufferNew.timestamps.data() + A,
92 bufferCur.timestamps.data() + ACur,
93 sizeof(int64_t)*(B - A));
95 std::memcpy(bufferNew.metaPings.data() + A,
96 bufferCur.metaPings.data() + ACur,
99 std::memcpy(bufferNew.distances.data() + A,
100 bufferCur.distances.data() + ACur,
101 sizeof(
float)*(B - A));
105 struct ResampleBufferMainTask
108 std::shared_ptr<DataSetPersistent> persistent;
110 ResampleBufferMainTask(
Cogs::Core::Context* context, std::shared_ptr<DataSetPersistent> persistent)
111 : context(context), persistent(persistent) {}
115 CpuInstrumentationScope(SCOPE_ECHOSOUNDER,
"ResampleBufferMain");
116 auto timer = Cogs::Timer::startNew();
117 const auto & configNew = persistent->configNew;
118 const auto & configCur = persistent->config;
119 auto & bufferNew = persistent->bufferNew;
120 auto & bufferCur = persistent->buffer;
123 const auto Q = configNew.capacity;
124 const auto M = configNew.beamCount;
125 const auto N = configNew.sampleCount;
126 const auto keepSlices = std::min(configNew.capacity - 1, bufferCur.sliceCount);
127 const auto keepBegin = (bufferCur.sliceBegin + bufferCur.sliceCount - keepSlices) % configCur.capacity;
128 const auto concurrency = std::max(1u, (uint32_t)Cogs::Threads::hardwareConcurrency());
129 const auto slicesPerTask = std::max(10u, keepSlices / concurrency);
131 bufferNew.sliceBegin = 0;
132 bufferNew.sliceCount = keepSlices;
133 bufferNew.timestamps.resize(Q,
false);
134 bufferNew.values.resize(Q * M * N,
false);
135 bufferNew.bottomDepths.resize(Q * M,
false);
136 bufferNew.bottomReflectivities.resize(Q * M,
false);
137 bufferNew.metaPings.resize(Q,
false);
138 bufferNew.distances.resize(Q,
false);
141 if (0 < keepSlices) {
143 if (keepBegin + keepSlices <= configCur.capacity)
145 for (uint32_t s = 0; s < keepSlices; s += slicesPerTask)
147 context->
taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(keepSlices, s + slicesPerTask)));
152 auto split = configCur.capacity - keepBegin;
153 assert(split <= keepSlices);
154 for (uint32_t s = 0; s < split; s += slicesPerTask)
156 context->
taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(split, s + slicesPerTask)));
159 for (uint32_t s = split; s < keepSlices; s += slicesPerTask)
161 context->
taskManager->enqueueChild(group, ResampleBufferWorkerTask(persistent, s, std::min(keepSlices, s + slicesPerTask)));
166 bufferNew.distances[keepSlices - 1] = 0;
168 LOG_DEBUG(logger,
"ResampleBuffer keep=%d, stride=%d, tasks=%d, elapsed=%.2fms", keepSlices, slicesPerTask, tasks, 1000.0*timer.elapsedSeconds());
170 persistent->dataReady =
true;
171 persistent->runningTasks--;
178void Cogs::Core::EchoSounder::DataSetSystem::updateGridAngles(
DataSetData& )
183 const auto& config = dataData.persistent->config;
184 const auto minorCount = config.minorCount;
185 const auto majorCount = config.majorCount;
187 const auto & fullMinorAngles = config.directionY;
188 const auto & fullMajorAngles = config.directionX;
190 auto & separatedMinorAngles = dataData.directionY;
191 auto & separatedMajorAngles = dataData.directionX;
193 const auto iN = minorCount;
194 const auto jN = majorCount;
196 separatedMinorAngles.clear();
197 separatedMinorAngles.resize(minorCount);
198 for (uint32_t i = 0; i < minorCount; i++) {
199 float sumAngle = 0.f;
200 for (uint32_t j = 0; j < majorCount; j++) {
201 sumAngle += fullMinorAngles[minorCount*j + i];
203 separatedMinorAngles[i] = sumAngle / majorCount;
206 separatedMajorAngles.clear();
207 separatedMajorAngles.resize(majorCount);
208 for (uint32_t j = 0; j < majorCount; j++) {
209 float sumAngle = 0.f;
210 for (uint32_t i = 0; i < minorCount; i++) {
211 sumAngle += fullMajorAngles[minorCount*j + i];
213 separatedMajorAngles[j] = sumAngle / minorCount;
220bool Cogs::Core::EchoSounder::DataSetSystem::handleConfigTransition(
Context* context,
DataSetData& dataData)
222 dataData.persistent->configValid =
false;
223 const auto & configNew = dataData.persistent->configNew;
225 if (configNew.capacity < 1)
return false;
226 if (configNew.beamCount < 1)
return false;
227 if (configNew.sampleCount < 2)
return false;
229 const auto & configCurr = dataData.persistent->config;
230 if ((configCurr.beamCount != configNew.beamCount) ||
231 (configCurr.coordSys != configNew.coordSys) ||
232 (std::numeric_limits<float>::epsilon() < glm::compMax(glm::abs(configCurr.transducerAlpha - configNew.transducerAlpha))) ||
233 (std::numeric_limits<float>::epsilon() < glm::compMax(glm::abs(configCurr.transducerOffset - configNew.transducerOffset))) ||
234 (configCurr.useDecibel != configNew.useDecibel) ||
235 !equals(configCurr.directionX, configNew.directionX, std::numeric_limits<float>::epsilon()) ||
236 !equals(configCurr.directionY, configNew.directionY, std::numeric_limits<float>::epsilon()) ||
237 !equals(configCurr.beamWidthX, configNew.beamWidthX, std::numeric_limits<float>::epsilon()) ||
238 !equals(configCurr.beamWidthY, configNew.beamWidthY, std::numeric_limits<float>::epsilon()))
240 resizeBuffers(dataData);
243 if ((configCurr.capacity == configNew.capacity) &&
244 (configCurr.sampleCount == configNew.sampleCount) &&
245 (std::abs(configCurr.depthOffset - configNew.depthOffset) < std::numeric_limits<float>::epsilon()) &&
246 (std::abs(configCurr.depthStep - configNew.depthStep) < std::numeric_limits<float>::epsilon()))
248 dataData.persistent->configValid =
true;
251 if (configNew.tryPreserve) {
252 dataData.persistent->runningTasks++;
253 dataData.persistent->configValid =
true;
254 context->
taskManager->enqueue(context->
taskManager->GlobalQueue, ResampleBufferMainTask(context, dataData.persistent));
258 resizeBuffers(dataData);
265void Cogs::Core::EchoSounder::DataSetSystem::resizeBuffers(
DataSetData& dataData)
267 const auto & configNew = dataData.persistent->configNew;
268 dataData.persistent->config = configNew;
269 dataData.persistent->configValid =
true;
270 dataData.configGen++;
272 const auto Q = dataData.persistent->config.capacity;
273 const auto M = dataData.persistent->config.beamCount;
274 const auto N = dataData.persistent->config.sampleCount;
276 dataData.persistent->buffer.sliceBegin = 0;
277 dataData.persistent->buffer.sliceCount = 0;
278 dataData.persistent->buffer.timestamps.resize(Q,
false);
279 dataData.persistent->buffer.values.resize(Q * M * N,
false);
280 dataData.persistent->buffer.bottomDepths.resize(Q * M,
false);
281 dataData.persistent->buffer.bottomReflectivities.resize(Q * M,
false);
282 dataData.persistent->buffer.metaPings.resize(Q,
false);
283 dataData.persistent->buffer.distances.resize(Q,
false);
285 updateGridAngles(dataData);
286 dataData.slicePreservedBegin = 0;
287 dataData.slicePreservedCount = 0;
288 dataData.configGen++;
293void Cogs::Core::EchoSounder::DataSetSystem::handleNewPing(
DataSetData& dataData,
Ping& ping)
295 if (!dataData.persistent->configValid)
return;
297 const auto & config = dataData.persistent->config;
299 if ((ping.beamCount != config.beamCount) ||
300 (ping.sampleCount != config.sampleCount))
302 LOG_WARNING(logger,
"handlePing: numBeams/numSamples mismatch, ignoring slice.");
306 auto & buf = dataData.persistent->buffer;
307 if (0 < buf.sliceCount) {
308 auto prevSlice = (buf.sliceBegin + buf.sliceCount - 1u) % config.capacity;
309 if (ping.timestamp <= buf.timestamps[prevSlice]) {
310 LOG_WARNING(logger,
"handlePing: Detected non-monotonic increasing timestamps, ignoring slice.");
315 if (buf.sliceCount == config.capacity) {
316 if (dataData.slicePreservedBegin == buf.sliceBegin) {
317 dataData.slicePreservedBegin = (dataData.slicePreservedBegin + 1u) % config.capacity;
318 dataData.slicePreservedCount = std::max(1u, dataData.slicePreservedCount) - 1u;
320 buf.sliceBegin = (buf.sliceBegin + 1u) % config.capacity;
321 buf.sliceCount = std::max(1u, buf.sliceCount) - 1u;
324 int slice = (buf.sliceBegin + buf.sliceCount) % config.capacity;
327 const auto M = config.beamCount;
328 const auto N = config.sampleCount;
330 buf.timestamps[slice] = ping.timestamp;
332 std::memcpy(buf.values.data() + M*N*slice, ping.values(), ping.valuesSize());
336 const auto * s = ping.bottomDepths();
337 auto * d = buf.bottomDepths.data() + M * slice;
338 for (
size_t i = 0; i < M; i++) {
339 d[i] = std::isfinite(s[i]) ? s[i] : 0.f;
343 std::memcpy(buf.bottomReflectivities.data() + M*slice, ping.bottomReflectivities(), ping.bottomReflectivitiesSize());
348 auto & metaPing = buf.metaPings[slice];
349 metaPing.vesselOrientationGlobal = ping.orientation;
350 metaPing.vesselPositionGlobal = ping.position;
351 getArrayToVesselTransform(buf.metaPings[slice].arrayOrientationVessel,
352 buf.metaPings[slice].arrayPositionVessel,
353 config.transducerAlpha,
354 config.transducerOffset);
355 buf.metaPings[slice].arrayOrientationGlobal = metaPing.vesselOrientationGlobal * buf.metaPings[slice].arrayOrientationVessel;
356 buf.metaPings[slice].arrayPositionGlobal = metaPing.vesselPositionGlobal + metaPing.vesselOrientationGlobal * buf.metaPings[slice].arrayPositionVessel;
359 if (0 < buf.sliceCount) {
360 auto minSliceDistance = std::max(0.001f, context->
variables->get(
"echo.minSliceDistance")->getFloat());
362 auto prevSlice = (buf.sliceBegin + buf.sliceCount - 1u) % config.capacity;
363 buf.distances[prevSlice] = std::max(minSliceDistance,
364 glm::distance(buf.metaPings[prevSlice].vesselPositionGlobal,
365 buf.metaPings[slice].vesselPositionGlobal));
367 buf.distances[slice] = 0;
374 for (
auto & dataSetComp : pool) {
375 auto & dataData = getData(&dataSetComp);
377 if (dataData.persistent->runningTasks != 0) {
381 if (dataData.persistent->dataReady) {
382 dataData.persistent->dataReady =
false;
384 dataData.persistent->config = dataData.persistent->configNew;
385 dataData.persistent->buffer.sliceBegin = dataData.persistent->bufferNew.sliceBegin;
386 dataData.persistent->buffer.sliceCount = dataData.persistent->bufferNew.sliceCount;
387 dataData.persistent->buffer.timestamps.swap(dataData.persistent->bufferNew.timestamps);
388 dataData.persistent->buffer.values.swap(dataData.persistent->bufferNew.values);
389 dataData.persistent->buffer.bottomDepths.swap(dataData.persistent->bufferNew.bottomDepths);
390 dataData.persistent->buffer.bottomReflectivities.swap(dataData.persistent->bufferNew.bottomReflectivities);
391 dataData.persistent->buffer.metaPings.swap(dataData.persistent->bufferNew.metaPings);
392 dataData.persistent->buffer.distances.swap(dataData.persistent->bufferNew.distances);
394 dataData.persistent->bufferNew.timestamps.clear();
395 dataData.persistent->bufferNew.values.clear();
396 dataData.persistent->bufferNew.bottomDepths.clear();
397 dataData.persistent->bufferNew.bottomReflectivities.clear();
398 dataData.persistent->bufferNew.metaPings.clear();
399 dataData.persistent->bufferNew.distances.clear();
401 updateGridAngles(dataData);
402 dataData.slicePreservedBegin = 0;
403 dataData.slicePreservedCount = 0;
404 dataData.configGen++;
408 while (!dataSetComp.messages.empty()) {
409 dataSetComp.setChangedTransient();
411 auto * baseMessage = dataSetComp.messages.front();
413 auto * clearMessage =
dynamic_cast<ClearMessage*
>(baseMessage);
415 dataData.persistent->buffer.sliceBegin = 0;
416 dataData.persistent->buffer.sliceCount = 0;
417 dataData.slicePreservedBegin = 0;
418 dataData.slicePreservedCount = 0;
423 auto * configMessage =
dynamic_cast<ConfigMessage*
>(baseMessage);
425 dataData.persistent->configNew = configMessage->config;
426 dataData.persistent->configReady =
true;
429 auto * pingMessage =
dynamic_cast<PingMessage*
>(baseMessage);
431 if (dataData.persistent->configReady) {
432 dataData.persistent->configReady =
false;
434 if (handleConfigTransition(context, dataData)) {
439 handleNewPing(dataData, pingMessage->ping);
442 dataSetComp.messages.pop_front();
void update()
Updates the system state to that of the current frame.
A Context instance contains all the services, systems and runtime components needed to use Cogs.
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
std::unique_ptr< class Variables > variables
Variables service instance.
Log implementation class.
constexpr Log getLogger(const char(&name)[LEN]) noexcept