1#include "TaskManager.h"
2#include "Services/Time.h"
3#include "Platform/Instrumentation.h"
4#include "Foundation/Platform/Timer.h"
6#include "Foundation/Collections/Pool.h"
7#include "Foundation/Logging/Logger.h"
8#include "Foundation/Platform/Threads.h"
12#include <condition_variable>
18#include <emscripten/threading.h>
27 thread_local uint32_t workerIndex = 0;
30#if !defined(COGS_SINGLETHREADED)
35 enum class TaskFlags : uint16_t
40 ENABLE_ENUM_FLAGS(TaskFlags);
59 TaskFlags
flags = TaskFlags::None;
72 static_assert(
sizeof(
Task) == 128,
"Size of the task struct should be an even 128 bytes.");
78 std::atomic<uint64_t> timeInKernel = 0;
79 std::atomic<uint32_t> kernelLaunches = 0;
80 uint32_t kernelNesting = 0;
156 task->
flags |= TaskFlags::Group;
163 assert(task->
active == 0 &&
"Task should not have active workers.");
173 assert(parentTask &&
"Parent task not valid.");
178 assert(parentTask->
generation == parentTaskId->generation &&
"Parent task not in generation.");
180 task->
parent = parentTask;
211 assert(
false &&
"Should never happen!");
254 auto task =
createTask(&func, handle, &parentTask);
378 assert(task->
active >= 1 &&
"Cannot execute inactive task.");
400 bool measure = stateData.kernelNesting++ == 0;
404 timer = Timer::startNew();
408 stateData.timeInKernel.fetch_add(
static_cast<uint64_t
>(1e6 * timer.elapsedSeconds()));
409 stateData.kernelLaunches.fetch_add(1);
411 assert(stateData.kernelNesting);
412 stateData.kernelNesting--;
442 assert(task->
active &&
"Task must have active tasks when calling finishTask().");
444 if ((task->
flags & TaskFlags::Group) == TaskFlags::Group)
470 assert(!task->
active &&
"Task cannot have active sub tasks when destroyed.");
473 if ((task->
flags & TaskFlags::Group) != TaskFlags::Group) {
485 bool isActive(
const TaskId& taskId)
487 assert(taskId.
queueId ==
id &&
"Task id not valid for current queue.");
489 return task->
active.load();
499 assert(taskId.
queueId ==
id &&
"Task id not valid for current queue.");
500 CpuInstrumentationScope(SCOPE_TASKMANAGER,
"Wait");
506 CpuInstrumentationScope(SCOPE_TASKMANAGER,
"Unblocked wait");
511 CpuInstrumentationScope(SCOPE_TASKMANAGER,
"Blocked wait");
513 while (task->
active.load())
526 CpuInstrumentationScope(SCOPE_TASKMANAGER,
"WaitAll");
533 size_t getConcurrency()
const
543 const std::string& getName()
const
548 void getQueueState(QueueState& queueState, std::vector<QueueWorkerState>&
workerStates)
const
550 queueState = this->queueState;
554 void updateState(Context* context)
556 float elapsed =
static_cast<float>(timer.elapsedSeconds());
557 if (elapsed < 1.0)
return;
558 timer = Timer::startNew();
559 uint32_t frameCount = context->time->getFrame() - lastFrame;
560 lastFrame = context->time->getFrame();
562 const float perSecondScale = 1.f / elapsed;
563 const float perFrameScale = 1.f / frameCount;
565 float aggregateTimeInKernel = 0.f;
566 uint32_t aggreagateKernelLaunches = 0;
568 for (
size_t i = 0; i < workerStateCount; i++) {
569 float timeInKernel = 1e-6f *
static_cast<float>((*workerStateData)[i].timeInKernel.exchange(0));
570 uint32_t kernelLaunches = (*workerStateData)[i].kernelLaunches.exchange(0);
571 workerStates[i].utilization = perSecondScale * timeInKernel;
572 workerStates[i].tasksPerSecond = perSecondScale * kernelLaunches;
573 workerStates[i].tasksPerFrame = perFrameScale * kernelLaunches;
574 aggregateTimeInKernel += timeInKernel;
575 aggreagateKernelLaunches += kernelLaunches;
578 queueState.load = perSecondScale * aggregateTimeInKernel;
579 queueState.tasksPerSecond = perSecondScale * aggreagateKernelLaunches;
580 queueState.tasksPerFrame = perFrameScale * aggreagateKernelLaunches;
584 Timer timer = Timer::startNew();
585 uint32_t lastFrame = 0;
587 QueueState queueState{};
650 Instrumentation::initializeThread(name.c_str());
651 CpuInstrumentationScope(SCOPE_TASKMANAGER,
"WorkerThread");
654 workerIndex = workerIx;
661 Instrumentation::destroyThread();
666 uint32_t workerIx = 0;
675 size_t workerStateCount =
threads.size() + 1;
677 workerStateData = std::make_unique<std::vector<QueueWorkerStateData>>(workerStateCount);
681 for (
size_t i = 0; i <
threads.size(); ++i)
686 n += std::to_string(i);
688 TaskWorker worker(
this,
static_cast<uint32_t
>(i + 1), n);
691 Threads::setName(
threads[i], n);
721 queues.resize(std::max(
static_cast<size_t>(
id + 1),
queues.size()));
723 queues[id] = std::make_unique<TaskQueue>(
id, name, numThreads);
732 std::vector<std::unique_ptr<TaskQueue>>
queues;
742#if defined(COGS_SINGLETHREADED)
743#pragma message( "Building single threaded TaskManager." )
784 static const std::string noname =
"";
791 workerStates.clear();
858#pragma message( "Building multithreaded TaskManager." )
861 static constexpr Cogs::StringView globalQueueFactorName =
"taskManager.globalQueue.threadCountFactor";
862 static constexpr Cogs::StringView globalQueueMaxName =
"taskManager.globalQueue.threadCountMax";
864 static constexpr Cogs::StringView resourceQueueFactorName =
"taskManager.resourceQueue.threadCountFactor";
865 static constexpr Cogs::StringView resourceQueueMaxName =
"taskManager.resourceQueue.threadCountMax";
871 size_t defaultMaxWorkers = std::max(
size_t(1), concurrency) - 1;
873 size_t workerCount =
static_cast<size_t>(std::max(0.f, context->
variables->get(factorName, factorDefault) * concurrency));
874 size_t maxWorkers =
static_cast<size_t>(std::max(0, context->
variables->get(maxName,
int(defaultMaxWorkers))));
875 return std::min(workerCount, maxWorkers);
878 bool hasThreadingSupport()
881 return emscripten_has_threading_support();
891 taskQueues->mainThreadId = std::this_thread::get_id();
897 size_t concurrency = Threads::hardwareConcurrency();
899 size_t globalThreads = 0;
900 size_t resourceThreads = 0;
901 if (hasThreadingSupport()) {
902 globalThreads = getWorkerCount(context, globalQueueFactorName, globalQueueMaxName, concurrency, 1.f);
903 resourceThreads = getWorkerCount(context, resourceQueueFactorName, resourceQueueMaxName, concurrency, 0.5);
906 LOG_WARNING(logger,
"Multithreaded cogs, but no threading available.");
908 LOG_INFO(logger,
"Concurrency: %zu globalThreads=%zu resourceThreads=%zu", concurrency, globalThreads, resourceThreads);
922 taskQueues->queues.clear();
927 for (std::unique_ptr<TaskQueue>& queue : taskQueues->queues) {
928 queue->updateState(context);
934 return taskQueues->queues.size();
939 assert(queue < taskQueues->queues.size());
940 return taskQueues->queues[queue]->getName();
945 assert(queue < taskQueues->queues.size());
946 return taskQueues->queues[queue]->getQueueState(queueState, workerState);
952 return q->getConcurrency();
957 return std::this_thread::get_id() == taskQueues->mainThreadId;
962 return (*taskQueues)[queueId];
969 return q->create(func);
976 return q->createGroup();
983 return q->create(func, task);
990 return q->enqueue(task);
997 return q->enqueue(func);
1004 return q->enqueue(func, group);
1009 if (!taskId.
isValid())
return;
1017 return getQueue(task.
queueId)->isActive(task);
1025 if (onMainThread() && (queue->getId() == ResourceQueue)) {
1026 LOG_ERROR_ONCE(logger,
"Waited on a task in the resource queue from the main thread");
1028 getQueue(task.
queueId)->wait(task);
1033 auto q = getQueue(queue);
1040 for (
auto &q : taskQueues->queues)
1048 return taskQueues->createQueue(name, numThreads);
A Context instance contains all the services, systems and runtime components needed to use Cogs.
std::unique_ptr< class Variables > variables
Variables service instance.
TaskId enqueueChild(const TaskId &parentTask, TaskFunctionRef func)
Enqueue the given task func setting the given parent task.
size_t getQueueCount() const
Returns the number of queues managed.
void wait(const TaskId &taskId)
Wait for the task given by taskId.
size_t getQueueConcurrency(TaskQueueId queue)
Returns the number of worker threads in a task queue.
TaskId createGroup(TaskQueueId queue=GlobalQueue)
Create a task group and place it in the given queue.
TaskId createChild(const TaskId &parentTask, TaskFunctionRef func)
Create a child task and associate it with the given parent task.
TaskManager(Context *context)
Constructs a TaskManager. Must be called on the main thread.
void getQueueState(QueueState &queueState, std::vector< QueueWorkerState > &workerStates, TaskQueueId queue) const
Queries the current state of a specific queue.
std::unique_ptr< class TaskQueues > taskQueues
Task queues manager.
void waitAll()
Wait for all tasks in all task queues.
static constexpr TaskQueueId GlobalQueue
Global task queue.
void enqueue(const TaskId &taskId)
Enqueues a previously created task.
static constexpr TaskQueueId ResourceQueue
Resource task queue.
bool onMainThread() const
Returns true if called on the main thread.
void updateState(Context *context)
Update state data, invoked once a frame by context.
void destroy(const TaskId &taskId)
Destroy the task given by taskId.
~TaskManager()
Destructs the TaskManager instance.
TaskId create(TaskQueueId queue, TaskFunctionRef func)
Create a task in the given queue.
const std::string & getQueueName(TaskQueueId queue) const
Returns the name of a queue.
bool isActive(const TaskId &taskId)
Poll to check whether or not the task has finished.
TaskQueueId createQueue(std::string_view name, const size_t numThreads)
Creates a new task queue with the given name.
Task pool providing fast and easy allocation and deallocation of Task instances.
TaskPool()
Construct a task pool with a default capacity of 1024 tasks.
Task queues holds tasks ready for execution by TaskWorkers.
void destroyTask(Task *task)
Destroy the given task, deallocating it from the task pool and freeing up any held resources.
Mutex taskMutex
Task queue mutex.
void workOnTask(Task *task)
Perform work on the given task.
void wait(const TaskId &taskId)
Wait on the task given by taskId.
Task * getNextTask()
Get the next available Task from the task queue to execute.
Atomic< uint16_t > generation
Generation counter used to track task lifetime.
void finishTask(Task *task)
Finish the given task, returning the Task instance to the TaskPool and removing it from the set of ac...
void destroy(const TaskId &taskId)
Destroy the task with the given id.
TaskQueue(TaskQueueId id, std::string_view name, size_t numThreads)
Construct a TaskQueue instance with the given id and name.
void enqueueTask(Task *task)
Enqueue the given task, executing it whenever a worker thread in the queue is available.
Task * createTask(const TaskFunction *func, ElementHandle &handle, const TaskId *parentTaskId=nullptr)
Creates a new task.
TaskId createGroup()
Create a task group.
TaskId enqueue(TaskFunctionRef func)
Enqueue the given func object in the queue by creating a new Task with the function as work kernel.
std::condition_variable taskVariable
Condition variable used to signal tasks available for execution.
~TaskQueue()
Destructs a TaskQueue.
void enqueue(const TaskId &taskId)
Fetch the task with the given task id and enqueue it.
std::vector< Thread > threads
Array of threads used to execute TaskWorker instances.
std::unique_ptr< std::vector< QueueWorkerStateData > > workerStateData
Tracks worker thread kernel launches continuosly.
Task * getAvailableTask()
Gets a task from the task queue, or returns nullptr if no available task is found.
std::queue< Task * > taskQueue
Currently queued tasks, ready for execution by a TaskWorker.
bool canExecute(const Task *task) const
Check if the given task is ready to be executed.
TaskQueueId id
Id of the task queue.
TaskPool taskPool
Task pool holding Task instances.s.
void initializeWorkers()
Initialize TaskWorker instances.
std::vector< QueueWorkerState > workerStates
Aggregated worker state data.
TaskId create(TaskFunctionRef func, const TaskId &parentTask)
Create a task wrapping the given function, parented to the given parent task.
std::string name
Name of the queue used for debugging.
TaskId create(TaskFunctionRef func)
Create a task wrapping the given function.
void waitAll()
Wait for all enqueued tasks to finish.
Task * getTask(const TaskId &taskId)
Get the task with the given id from the pool.
Mutex poolMutex
TaskPool mutex.
void yield()
Yield the currently executing thread to perform work on other available tasks or simply give back exe...
bool done
If the queue is done and should stop executing tasks.
TaskId enqueue(TaskFunctionRef func, const TaskId &parentTaskId)
Enqueue the given func object in the queue by creating a new Task with the function as work kernel.
Atomic< int > active
Number of currently active tasks.
Manages the set of active task queues in the task manager.
TaskQueues()=default
Default constructs the TaskQueues instance.
TaskQueue * operator[](TaskQueueId id)
Retrieve the task queue with the given id.
Atomic< TaskQueueId > nextId
Next available task queue id.
TaskQueueId createQueue(std::string_view name, const size_t numThreads)
Create a new TaskQueue with the given name.
std::thread::id mainThreadId
Id of the main thread.
std::vector< std::unique_ptr< TaskQueue > > queues
Map of active task queues.
Task worker executing Task instances retrieved from a TaskQueue.
TaskWorker(TaskQueue *taskQueue, uint32_t workerIx, const std::string &name)
Construct a TaskWorker with the given taskQueue.
void operator()()
Executes the task worker.
TaskQueue * taskQueue
Task queue to retrieve Task instances from.
Defines a task meant to be placed in a TaskQueue and handled by a TaskWorker instance.
std::atomic< uint16_t > generation
Generation counter, used to identify reallocated tasks.
TaskFlags flags
Task flags.
Task * parent
Pointer to the parent task, nullptr if no parent task exists.
TaskFunction kernel
Task kernel, this is the function to run.
std::atomic< int > continuationCount
Continuation count.
Task()=default
Default construct a Task.
TaskId continuations[5]
Registered continuations.
Atomic< int > active
Number of active sub-tasks.
Log implementation class.
Provides a weakly referenced view over the contents of a string.
Contains collection classes used to manage object storage.
uint32_t ElementHandle
Handle type for elements.
std::function< void()> TaskFunction
Type of task function used by the task manager.
uint16_t TaskQueueId
Unique id for a task queue.
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Contains all Cogs related functionality.
Pool used to store elements of ElementType.
ElementType * create(ARGS &&... args)
Allocate and initialize a new element from the pool passing any arguments to the constructor of the n...
ElementHandle getHandle(ElementType *element) const
Get a handle for the given element.
void destroy(ElementType *element)
Free and destroy the given element in the pool.
Reports current state of a queue.
Tracks kernel launches by a worker thread as they happen.
Task id struct used to identify unique Task instances.
bool isValid() const
Check if the task id is valid.
uint32_t taskHandle
Integer handle to the task in a TaskPool.
uint16_t generation
Generation counter used to separate generational instances.
TaskQueueId queueId
Id of the TaskQueue the Task belongs to.