Cogs.Core
TaskManager.cpp
1#include "TaskManager.h"
2#include "Services/Time.h"
3#include "Platform/Instrumentation.h"
4#include "Foundation/Platform/Timer.h"
5
6#include "Foundation/Collections/Pool.h"
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/Threads.h"
9
10#include <atomic>
11#include <cassert>
12#include <condition_variable>
13#include <queue>
14#include <string>
15#include <vector>
16
17#ifdef EMSCRIPTEN
18#include <emscripten/threading.h>
19#endif
20
21using namespace Cogs::Collections;
22
23namespace {
25
26 // Contains the worker thread index in a queue + 1 for worker threads, and zero for main thread.
27 thread_local uint32_t workerIndex = 0;
28}
29
30#if !defined(COGS_SINGLETHREADED)
31namespace Cogs
32{
33 namespace Core
34 {
35 enum class TaskFlags : uint16_t
36 {
37 None = 0,
38 Group = 1,
39 };
40 ENABLE_ENUM_FLAGS(TaskFlags);
41
43 class Task
44 {
45 public:
47 Task() = default;
48
51
53 Task *parent = nullptr;
54
56 Atomic<int> active{0};
57
59 TaskFlags flags = TaskFlags::None;
60
62 std::atomic<uint16_t> generation = 0;
63
65 std::atomic<int> continuationCount;
66
69 };
70
71#ifdef _WIN64
72 static_assert(sizeof(Task) == 128, "Size of the task struct should be an even 128 bytes.");
73#endif
74
77 {
78 std::atomic<uint64_t> timeInKernel = 0;
79 std::atomic<uint32_t> kernelLaunches = 0;
80 uint32_t kernelNesting = 0;
81 };
82
90 class TaskPool : public Pool<Task>
91 {
92 public:
94 TaskPool() : Pool(1024, 1024) {}
95 };
96
102 {
103 public:
110 TaskQueue(TaskQueueId id, std::string_view name, size_t numThreads) : name(name), id(id), threads(numThreads)
111 {
113 }
114
115 TaskQueue(const TaskQueue&) = delete;
116 TaskQueue& operator=(const TaskQueue&) = delete;
117
122 {
123 {
124 LockGuard lock(taskMutex);
125 done = true;
126 }
127
128 taskVariable.notify_all();
129
130 for (auto &t : threads)
131 {
132 t.join();
133 }
134 }
135
139 Task *createTask(const TaskFunction *func, ElementHandle &handle, const TaskId *parentTaskId = nullptr)
140 {
141 Task *task;
142
143 {
144 LockGuard poolLock(poolMutex);
145
146 task = taskPool.create();
147 handle = taskPool.getHandle(task);
148 }
149
150 if (func)
151 {
152 task->kernel = *func;
153 }
154 else
155 {
156 task->flags |= TaskFlags::Group;
157 }
158
159 // Assign the unique generation counter to differentiate between the currently created task
160 // and reallocated tasks using the same storage at a later point in time.
161 task->generation = ++generation;
162
163 assert(task->active == 0 && "Task should not have active workers.");
164
165 // The number of active sub-tasks for the Task counts itself.
166 if (func)
167 ++task->active;
168
169 if (parentTaskId)
170 {
171 Task *parentTask = getTask(*parentTaskId);
172
173 assert(parentTask && "Parent task not valid.");
174
175 // Ensure the parent task has not been recycled. Trying to parent a task to a recycled task
176 // is considered a programmer error. It is up to the calling code to keep the parent alive until
177 // children are added.
178 assert(parentTask->generation == parentTaskId->generation && "Parent task not in generation.");
179
180 task->parent = parentTask;
181
182 // Add the child task as an active sub-task.
183 ++parentTask->active;
184 }
185
186 // Keep a count of the number of queued/executing tasks.
187 if (func)
188 active++;
189
190 return task;
191 }
192
194 Task *getTask(const TaskId &taskId)
195 {
196 LockGuard poolLock(poolMutex);
197
198 return taskPool[taskId.taskHandle];
199 }
200
205 void destroy(const TaskId &taskId)
206 {
207 auto task = getTask(taskId);
208
209 if (task->generation != taskId.generation)
210 {
211 assert(false && "Should never happen!");
212 return;
213 }
214 else
215 {
216 // We need to be sure the task has completed before removing it from the pool.
217 wait(taskId);
218
219 destroyTask(task);
220 }
221 }
222
226 TaskId create(TaskFunctionRef func)
227 {
228 ElementHandle handle;
229
230 auto task = createTask(&func, handle);
231
232 return TaskId{handle, task->generation, id};
233 }
234
239 {
240 ElementHandle handle;
241
242 auto task = createTask(nullptr, handle);
243
244 return TaskId{handle, task->generation, id};
245 }
246
250 TaskId create(TaskFunctionRef func, const TaskId &parentTask)
251 {
252 ElementHandle handle;
253
254 auto task = createTask(&func, handle, &parentTask);
255
256 return TaskId{handle, task->generation, id};
257 }
258
262 void enqueueTask(Task *task)
263 {
264 if (threads.empty()) {
265 task->kernel();
266 finishTask(task);
267 return;
268 }
269
270 LockGuard taskLock(taskMutex);
271 taskQueue.push(task);
272 // Wake up a single waiting TaskWorker instance (if any) to execute the enqueued
273 // task.
274 taskVariable.notify_one();
275 }
276
280 void enqueue(const TaskId &taskId)
281 {
282 enqueueTask(getTask(taskId));
283 }
284
294 TaskId enqueue(TaskFunctionRef func)
295 {
296 if (threads.empty()) {
297 func();
298 return NoTask;
299 }
300
301 ElementHandle handle;
302 Task* task = createTask(&func, handle);
303 enqueueTask(task);
304
305 return TaskId{handle, task->generation, id};
306 }
307
318 TaskId enqueue(TaskFunctionRef func, const TaskId &parentTaskId)
319 {
320 if (threads.empty()) {
321 func();
322 return NoTask;
323 }
324
325 ElementHandle handle;
326 Task* task = createTask(&func, handle, &parentTaskId);
327 enqueueTask(task);
328
329 return TaskId{handle, task->generation, id};
330 }
331
338 {
339 UniqueLock taskLock(taskMutex);
340
341 taskVariable.wait(taskLock, [this]()
342 { return taskQueue.size() || done; });
343
344 if (done)
345 return nullptr;
346
347 auto task = taskQueue.front();
348 taskQueue.pop();
349 return task;
350 }
351
356 {
357 LockGuard taskLock(taskMutex);
358
359 if (taskQueue.size())
360 {
361 auto task = taskQueue.front();
362 taskQueue.pop();
363 return task;
364 }
365
366 return nullptr;
367 }
368
376 bool canExecute(const Task *task) const
377 {
378 assert(task->active >= 1 && "Cannot execute inactive task.");
379
380 return task->active == 1;
381 }
382
391 void workOnTask(Task *task)
392 {
393 while (!canExecute(task))
394 {
395 yield();
396 }
397
398 Timer timer;
399 QueueWorkerStateData& stateData = (*workerStateData)[workerIndex];
400 bool measure = stateData.kernelNesting++ == 0;
401 if (measure) {
402 // Running kernels can be nested if the kernel waits for another task, so we only measure
403 // the outermost kernel launch to avoid measuring the same timespan multiple times.
404 timer = Timer::startNew();
405 }
406 task->kernel();
407 if (measure) {
408 stateData.timeInKernel.fetch_add(static_cast<uint64_t>(1e6 * timer.elapsedSeconds()));
409 stateData.kernelLaunches.fetch_add(1);
410 }
411 assert(stateData.kernelNesting);
412 stateData.kernelNesting--;
413 finishTask(task);
414 }
415
420 void yield()
421 {
422 auto task = getAvailableTask();
423
424 if (task)
425 {
426 workOnTask(task);
427 }
428 else
429 {
430 Threads::yield();
431 }
432 }
433
440 void finishTask(Task *task)
441 {
442 assert(task->active && "Task must have active tasks when calling finishTask().");
443
444 if ((task->flags & TaskFlags::Group) == TaskFlags::Group)
445 {
446 // Groups are not destroyed automatically when the active count reaches zero.
447 --task->active;
448 return;
449 }
450
451 // Remove the task itself from its count of active sub-tasks.
452 if (--task->active == 0)
453 {
454 if (task->parent)
455 {
456 // See if the parent task needs to be destroyed. Will have its active worker count decremented
457 // by finishTask().
458 finishTask(task->parent);
459 }
460
461 destroyTask(task);
462 }
463 }
464
468 void destroyTask(Task *task)
469 {
470 assert(!task->active && "Task cannot have active sub tasks when destroyed.");
471
472 // Remove the task itself from the queues active tasks.
473 if ((task->flags & TaskFlags::Group) != TaskFlags::Group) {
474 --active;
475 }
476
477 task->generation = ++generation;
478
479 {
480 LockGuard poolLock(poolMutex);
481 taskPool.destroy(task);
482 }
483 }
484
485 bool isActive(const TaskId& taskId)
486 {
487 assert(taskId.queueId == id && "Task id not valid for current queue.");
488 Task* task = getTask(taskId);
489 return task->active.load();
490 }
491
497 void wait(const TaskId &taskId)
498 {
499 assert(taskId.queueId == id && "Task id not valid for current queue.");
500 CpuInstrumentationScope(SCOPE_TASKMANAGER, "Wait");
501
502 auto task = getTask(taskId);
503
504 if (task->generation != taskId.generation)
505 {
506 CpuInstrumentationScope(SCOPE_TASKMANAGER, "Unblocked wait");
507 return;
508 }
509 else
510 {
511 CpuInstrumentationScope(SCOPE_TASKMANAGER, "Blocked wait");
512
513 while (task->active.load())
514 {
515 yield();
516 }
517 }
518 }
519
524 void waitAll()
525 {
526 CpuInstrumentationScope(SCOPE_TASKMANAGER, "WaitAll");
527 while (active)
528 {
529 yield();
530 }
531 }
532
533 size_t getConcurrency() const
534 {
535 return threads.size();
536 }
537
538 TaskQueueId getId() const
539 {
540 return id;
541 }
542
543 const std::string& getName() const
544 {
545 return name;
546 }
547
548 void getQueueState(QueueState& queueState, std::vector<QueueWorkerState>& workerStates) const
549 {
550 queueState = this->queueState;
552 }
553
554 void updateState(Context* context)
555 {
556 float elapsed = static_cast<float>(timer.elapsedSeconds());
557 if (elapsed < 1.0) return;
558 timer = Timer::startNew();
559 uint32_t frameCount = context->time->getFrame() - lastFrame;
560 lastFrame = context->time->getFrame();
561
562 const float perSecondScale = 1.f / elapsed;
563 const float perFrameScale = 1.f / frameCount;
564
565 float aggregateTimeInKernel = 0.f;
566 uint32_t aggreagateKernelLaunches = 0;
567 size_t workerStateCount = workerStates.size();
568 for (size_t i = 0; i < workerStateCount; i++) {
569 float timeInKernel = 1e-6f * static_cast<float>((*workerStateData)[i].timeInKernel.exchange(0));
570 uint32_t kernelLaunches = (*workerStateData)[i].kernelLaunches.exchange(0);
571 workerStates[i].utilization = perSecondScale * timeInKernel;
572 workerStates[i].tasksPerSecond = perSecondScale * kernelLaunches;
573 workerStates[i].tasksPerFrame = perFrameScale * kernelLaunches;
574 aggregateTimeInKernel += timeInKernel;
575 aggreagateKernelLaunches += kernelLaunches;
576 }
577
578 queueState.load = perSecondScale * aggregateTimeInKernel;
579 queueState.tasksPerSecond = perSecondScale * aggreagateKernelLaunches;
580 queueState.tasksPerFrame = perFrameScale * aggreagateKernelLaunches;
581 }
582
583 private:
584 Timer timer = Timer::startNew();
585 uint32_t lastFrame = 0;
586
587 QueueState queueState{};
588
590 std::vector<QueueWorkerState> workerStates;
591
593 std::unique_ptr<std::vector<QueueWorkerStateData>> workerStateData;
594
596 void initializeWorkers();
597
600
602 std::string name;
603
605 Atomic<int> active{0};
606
608 bool done = false;
609
611 Atomic<uint16_t> generation = 0;
612
614 std::queue<Task *> taskQueue;
615
618
621
623 std::condition_variable taskVariable;
624
627
629 std::vector<Thread> threads;
630 };
631
636 {
637 public:
639 TaskWorker(TaskQueue *taskQueue, uint32_t workerIx, const std::string &name) : taskQueue(taskQueue), workerIx(workerIx), name(name)
640 {
641 }
642
649 {
650 Instrumentation::initializeThread(name.c_str());
651 CpuInstrumentationScope(SCOPE_TASKMANAGER, "WorkerThread");
652
653 assert(workerIx);
654 workerIndex = workerIx;
655
656 while (auto task = taskQueue->getNextTask())
657 {
658 taskQueue->workOnTask(task);
659 }
660
661 Instrumentation::destroyThread();
662 }
663
666 uint32_t workerIx = 0;
667 std::string name;
668 };
669
674 {
675 size_t workerStateCount = threads.size() + 1;
676 workerStates.resize(workerStateCount);
677 workerStateData = std::make_unique<std::vector<QueueWorkerStateData>>(workerStateCount);
678
679 std::string n;
680 n.reserve(120);
681 for (size_t i = 0; i < threads.size(); ++i)
682 {
683 n.clear();
684 n += name;
685 n += " Thread ";
686 n += std::to_string(i);
687
688 TaskWorker worker(this, static_cast<uint32_t>(i + 1), n);
689 threads[i] = Thread(worker);
690
691 Threads::setName(threads[i], n);
692 }
693 }
694
699 {
700 public:
702 TaskQueues() = default;
703
705 TaskQueue *operator[](TaskQueueId id) { return queues[id].get(); }
706
716 TaskQueueId createQueue(std::string_view name, const size_t numThreads)
717 {
718 auto id = nextId++;
719
720 // Ensure the container can hold the desired queue index.
721 queues.resize(std::max(static_cast<size_t>(id + 1), queues.size()));
722
723 queues[id] = std::make_unique<TaskQueue>(id, name, numThreads);
724
725 return id;
726 }
727
729 std::thread::id mainThreadId;
730
732 std::vector<std::unique_ptr<TaskQueue>> queues;
733
735 Atomic<TaskQueueId> nextId{0};
736 };
737 }
738}
739#endif
740
741
742#if defined(COGS_SINGLETHREADED)
743#pragma message( "Building single threaded TaskManager." )
744namespace Cogs
745{
746 namespace Core
747 {
748 class TaskQueues
749 {
750 };
751 }
752}
753
754Cogs::Core::TaskManager::TaskManager(Context* /*context*/)
755{
756 taskQueues = std::make_unique<TaskQueues>();
757}
758
760{
761}
762
763void Cogs::Core::TaskManager::updateState(Context* context)
764{
765}
766
767Cogs::Core::TaskQueueId Cogs::Core::TaskManager::createQueue(std::string_view /*name*/, const size_t /*numThreads*/)
768{
769 return GlobalQueue;
770}
771
773{
774 return 0;
775}
776
778{
779 return 0;
780}
781
782const std::string& Cogs::Core::TaskManager::getQueueName(TaskQueueId queue) const
783{
784 static const std::string noname = "";
785 return noname;
786}
787
788void Cogs::Core::TaskManager::getQueueState(QueueState& queueState, std::vector<QueueWorkerState>& workerStates, TaskQueueId /*queue*/) const
789{
790 queueState = {};
791 workerStates.clear();
792}
793
795{
796 return true;
797}
798
800{
801 return TaskId();
802}
803
805{
806 func();
807
808 return TaskId();
809}
810
811Cogs::Core::TaskId Cogs::Core::TaskManager::createChild(const TaskId &/*parentTask*/, TaskFunctionRef func)
812{
813 func();
814
815 return TaskId();
816}
817
818void Cogs::Core::TaskManager::enqueue(const TaskId &/*taskId*/)
819{
820}
821
822Cogs::Core::TaskId Cogs::Core::TaskManager::enqueueChild(const TaskId &/*parentTask*/, TaskFunctionRef func)
823{
824 func();
825
826 return TaskId();
827}
828
830{
831 func();
832
833 return TaskId();
834}
835
836void Cogs::Core::TaskManager::destroy(const TaskId &/*taskId*/)
837{
838}
839
840bool Cogs::Core::TaskManager::isActive(const TaskId& /*taskId*/)
841{
842 return true;
843}
844
845void Cogs::Core::TaskManager::wait(const TaskId &/*taskId*/)
846{
847}
848
850{
851}
852
854{
855}
856
857#else
858#pragma message( "Building multithreaded TaskManager." )
859
860namespace {
861 static constexpr Cogs::StringView globalQueueFactorName = "taskManager.globalQueue.threadCountFactor";
862 static constexpr Cogs::StringView globalQueueMaxName = "taskManager.globalQueue.threadCountMax";
863
864 static constexpr Cogs::StringView resourceQueueFactorName = "taskManager.resourceQueue.threadCountFactor";
865 static constexpr Cogs::StringView resourceQueueMaxName = "taskManager.resourceQueue.threadCountMax";
866
867 size_t getWorkerCount(Cogs::Core::Context* context, Cogs::StringView factorName, Cogs::StringView maxName, size_t concurrency, float factorDefault)
868 {
869 // We do not want the workers to compete with the main thread when busy,
870 // hence the default is one less than the concurrency.
871 size_t defaultMaxWorkers = std::max(size_t(1), concurrency) - 1;
872
873 size_t workerCount = static_cast<size_t>(std::max(0.f, context->variables->get(factorName, factorDefault) * concurrency));
874 size_t maxWorkers = static_cast<size_t>(std::max(0, context->variables->get(maxName, int(defaultMaxWorkers))));
875 return std::min(workerCount, maxWorkers);
876 }
877
878 bool hasThreadingSupport()
879 {
880#ifdef EMSCRIPTEN
881 return emscripten_has_threading_support();
882#else
883 return true;
884#endif
885 }
886
887}
888
890{
891 taskQueues->mainThreadId = std::this_thread::get_id();
892
893 assert(context->variables);
894
895 // Usually, number of logical processors available. However, the value _can_
896 // be zero when the value is not well-defined or not computable.
897 size_t concurrency = Threads::hardwareConcurrency();
898
899 size_t globalThreads = 0;
900 size_t resourceThreads = 0;
901 if (hasThreadingSupport()) {
902 globalThreads = getWorkerCount(context, globalQueueFactorName, globalQueueMaxName, concurrency, 1.f);
903 resourceThreads = getWorkerCount(context, resourceQueueFactorName, resourceQueueMaxName, concurrency, 0.5);
904 }
905 else {
906 LOG_WARNING(logger, "Multithreaded cogs, but no threading available.");
907 }
908 LOG_INFO(logger, "Concurrency: %zu globalThreads=%zu resourceThreads=%zu", concurrency, globalThreads, resourceThreads);
909
910 // Create predefined Queues. MUST be in this order, se definition of GlobalQueue & ResourceQueue
911 TaskQueueId globalQueueId = createQueue("Global", globalThreads);
912 assert(globalQueueId == GlobalQueue);
913
914 TaskQueueId resourceQueueId = createQueue("Resource", resourceThreads);
915 assert(resourceQueueId == ResourceQueue);
916}
917
919{
920 waitAll();
921
922 taskQueues->queues.clear();
923}
924
926{
927 for (std::unique_ptr<TaskQueue>& queue : taskQueues->queues) {
928 queue->updateState(context);
929 }
930}
931
933{
934 return taskQueues->queues.size();
935}
936
938{
939 assert(queue < taskQueues->queues.size());
940 return taskQueues->queues[queue]->getName();
941}
942
943void Cogs::Core::TaskManager::getQueueState(QueueState& queueState, std::vector<QueueWorkerState>& workerState, TaskQueueId queue) const
944{
945 assert(queue < taskQueues->queues.size());
946 return taskQueues->queues[queue]->getQueueState(queueState, workerState);
947}
948
950{
951 TaskQueue* q = getQueue(queue);
952 return q->getConcurrency();
953}
954
956{
957 return std::this_thread::get_id() == taskQueues->mainThreadId;
958}
959
960inline Cogs::Core::TaskQueue* Cogs::Core::TaskManager::getQueue(TaskQueueId queueId)
961{
962 return (*taskQueues)[queueId];
963}
964
966{
967 TaskQueue* q = getQueue(queue);
968
969 return q->create(func);
970}
971
973{
974 TaskQueue* q = getQueue(queue);
975
976 return q->createGroup();
977}
978
980{
981 TaskQueue* q = getQueue(task.queueId);
982
983 return q->create(func, task);
984}
985
987{
988 TaskQueue* q = getQueue(task.queueId);
989
990 return q->enqueue(task);
991}
992
994{
995 TaskQueue* q = getQueue(queue);
996
997 return q->enqueue(func);
998}
999
1001{
1002 TaskQueue* q = getQueue(group.queueId);
1003
1004 return q->enqueue(func, group);
1005}
1006
1008{
1009 if (!taskId.isValid()) return;
1010
1011 TaskQueue* q = getQueue(taskId.queueId);
1012 q->destroy(taskId);
1013}
1014
1016{
1017 return getQueue(task.queueId)->isActive(task);
1018}
1019
1021{
1022 if (!task.isValid()) return;
1023
1024 TaskQueue* queue = getQueue(task.queueId);
1025 if (onMainThread() && (queue->getId() == ResourceQueue)) {
1026 LOG_ERROR_ONCE(logger, "Waited on a task in the resource queue from the main thread");
1027 }
1028 getQueue(task.queueId)->wait(task);
1029}
1030
1032{
1033 auto q = getQueue(queue);
1034
1035 q->waitAll();
1036}
1037
1039{
1040 for (auto &q : taskQueues->queues)
1041 {
1042 q->waitAll();
1043 }
1044}
1045
1046Cogs::Core::TaskQueueId Cogs::Core::TaskManager::createQueue(std::string_view name, const size_t numThreads)
1047{
1048 return taskQueues->createQueue(name, numThreads);
1049}
1050
1051#endif
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class Variables > variables
Variables service instance.
Definition: Context.h:180
TaskId enqueueChild(const TaskId &parentTask, TaskFunctionRef func)
Enqueue the given task func setting the given parent task.
size_t getQueueCount() const
Returns the number of queues managed.
void wait(const TaskId &taskId)
Wait for the task given by taskId.
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
TaskId createGroup(TaskQueueId queue=GlobalQueue)
Create a task group and place it in the given queue.
TaskId createChild(const TaskId &parentTask, TaskFunctionRef func)
Create a child task and associate it with the given parent task.
TaskManager(Context *context)
Constructs a TaskManager. Must be called on the main thread.
void getQueueState(QueueState &queueState, std::vector< QueueWorkerState > &workerStates, TaskQueueId queue) const
Queries the current state of a specific queue.
std::unique_ptr< class TaskQueues > taskQueues
Task queues manager.
Definition: TaskManager.h:239
void waitAll()
Wait for all tasks in all task queues.
static constexpr TaskQueueId GlobalQueue
Global task queue.
Definition: TaskManager.h:224
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
Definition: TaskManager.h:232
bool onMainThread() const
Returns true if called on the main thread.
void updateState(Context *context)
Update state data, invoked once a frame by context.
void destroy(const TaskId &taskId)
Destroy the task given by taskId.
~TaskManager()
Destructs the TaskManager instance.
TaskId create(TaskQueueId queue, TaskFunctionRef func)
Create a task in the given queue.
const std::string & getQueueName(TaskQueueId queue) const
Returns the name of a queue.
bool isActive(const TaskId &taskId)
Poll to check whether or not the task has finished.
TaskQueueId createQueue(std::string_view name, const size_t numThreads)
Creates a new task queue with the given name.
Task pool providing fast and easy allocation and deallocation of Task instances.
Definition: TaskManager.cpp:91
TaskPool()
Construct a task pool with a default capacity of 1024 tasks.
Definition: TaskManager.cpp:94
Task queues holds tasks ready for execution by TaskWorkers.
void destroyTask(Task *task)
Destroy the given task, deallocating it from the task pool and freeing up any held resources.
Mutex taskMutex
Task queue mutex.
void workOnTask(Task *task)
Perform work on the given task.
void wait(const TaskId &taskId)
Wait on the task given by taskId.
Task * getNextTask()
Get the next available Task from the task queue to execute.
Atomic< uint16_t > generation
Generation counter used to track task lifetime.
void finishTask(Task *task)
Finish the given task, returning the Task instance to the TaskPool and removing it from the set of ac...
void destroy(const TaskId &taskId)
Destroy the task with the given id.
TaskQueue(TaskQueueId id, std::string_view name, size_t numThreads)
Construct a TaskQueue instance with the given id and name.
void enqueueTask(Task *task)
Enqueue the given task, executing it whenever a worker thread in the queue is available.
Task * createTask(const TaskFunction *func, ElementHandle &handle, const TaskId *parentTaskId=nullptr)
Creates a new task.
TaskId createGroup()
Create a task group.
TaskId enqueue(TaskFunctionRef func)
Enqueue the given func object in the queue by creating a new Task with the function as work kernel.
std::condition_variable taskVariable
Condition variable used to signal tasks available for execution.
~TaskQueue()
Destructs a TaskQueue.
void enqueue(const TaskId &taskId)
Fetch the task with the given task id and enqueue it.
std::vector< Thread > threads
Array of threads used to execute TaskWorker instances.
std::unique_ptr< std::vector< QueueWorkerStateData > > workerStateData
Tracks worker thread kernel launches continuosly.
Task * getAvailableTask()
Gets a task from the task queue, or returns nullptr if no available task is found.
std::queue< Task * > taskQueue
Currently queued tasks, ready for execution by a TaskWorker.
bool canExecute(const Task *task) const
Check if the given task is ready to be executed.
TaskQueueId id
Id of the task queue.
TaskPool taskPool
Task pool holding Task instances.s.
void initializeWorkers()
Initialize TaskWorker instances.
std::vector< QueueWorkerState > workerStates
Aggregated worker state data.
TaskId create(TaskFunctionRef func, const TaskId &parentTask)
Create a task wrapping the given function, parented to the given parent task.
std::string name
Name of the queue used for debugging.
TaskId create(TaskFunctionRef func)
Create a task wrapping the given function.
void waitAll()
Wait for all enqueued tasks to finish.
Task * getTask(const TaskId &taskId)
Get the task with the given id from the pool.
Mutex poolMutex
TaskPool mutex.
void yield()
Yield the currently executing thread to perform work on other available tasks or simply give back exe...
bool done
If the queue is done and should stop executing tasks.
TaskId enqueue(TaskFunctionRef func, const TaskId &parentTaskId)
Enqueue the given func object in the queue by creating a new Task with the function as work kernel.
Atomic< int > active
Number of currently active tasks.
Manages the set of active task queues in the task manager.
TaskQueues()=default
Default constructs the TaskQueues instance.
TaskQueue * operator[](TaskQueueId id)
Retrieve the task queue with the given id.
Atomic< TaskQueueId > nextId
Next available task queue id.
TaskQueueId createQueue(std::string_view name, const size_t numThreads)
Create a new TaskQueue with the given name.
std::thread::id mainThreadId
Id of the main thread.
std::vector< std::unique_ptr< TaskQueue > > queues
Map of active task queues.
Task worker executing Task instances retrieved from a TaskQueue.
TaskWorker(TaskQueue *taskQueue, uint32_t workerIx, const std::string &name)
Construct a TaskWorker with the given taskQueue.
void operator()()
Executes the task worker.
TaskQueue * taskQueue
Task queue to retrieve Task instances from.
Defines a task meant to be placed in a TaskQueue and handled by a TaskWorker instance.
Definition: TaskManager.cpp:44
std::atomic< uint16_t > generation
Generation counter, used to identify reallocated tasks.
Definition: TaskManager.cpp:62
TaskFlags flags
Task flags.
Definition: TaskManager.cpp:59
Task * parent
Pointer to the parent task, nullptr if no parent task exists.
Definition: TaskManager.cpp:53
TaskFunction kernel
Task kernel, this is the function to run.
Definition: TaskManager.cpp:50
std::atomic< int > continuationCount
Continuation count.
Definition: TaskManager.cpp:65
Task()=default
Default construct a Task.
TaskId continuations[5]
Registered continuations.
Definition: TaskManager.cpp:68
Atomic< int > active
Number of active sub-tasks.
Definition: TaskManager.cpp:56
Log implementation class.
Definition: LogManager.h:139
Provides a weakly referenced view over the contents of a string.
Definition: StringView.h:24
Old timer class.
Definition: Timer.h:37
Contains collection classes used to manage object storage.
uint32_t ElementHandle
Handle type for elements.
Definition: PoolBase.h:16
std::function< void()> TaskFunction
Type of task function used by the task manager.
Definition: TaskManager.h:38
uint16_t TaskQueueId
Unique id for a task queue.
Definition: TaskManager.h:14
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180
Contains all Cogs related functionality.
Definition: FieldSetter.h:23
Pool used to store elements of ElementType.
Definition: Pool.h:17
ElementType * create(ARGS &&... args)
Allocate and initialize a new element from the pool passing any arguments to the constructor of the n...
Definition: Pool.h:69
ElementHandle getHandle(ElementType *element) const
Get a handle for the given element.
Definition: Pool.h:114
void destroy(ElementType *element)
Free and destroy the given element in the pool.
Definition: Pool.h:82
Reports current state of a queue.
Definition: TaskManager.h:43
Tracks kernel launches by a worker thread as they happen.
Definition: TaskManager.cpp:77
Task id struct used to identify unique Task instances.
Definition: TaskManager.h:20
bool isValid() const
Check if the task id is valid.
Definition: TaskManager.h:29
uint32_t taskHandle
Integer handle to the task in a TaskPool.
Definition: TaskManager.h:22
uint16_t generation
Generation counter used to separate generational instances.
Definition: TaskManager.h:24
TaskQueueId queueId
Id of the TaskQueue the Task belongs to.
Definition: TaskManager.h:26