Cogs.Core
Parallel.h
1#pragma once
2
3#include "Context.h"
4
5#include "Services/TaskManager.h"
6#include "Platform/Instrumentation.h"
7
8#include "Foundation/ComponentModel/Component.h"
9#include "Foundation/Platform/Threads.h"
10
11namespace Cogs
12{
13 namespace Core
14 {
15 namespace Serial
16 {
17 template<typename Pool, typename Func>
18 void processComponents(Pool & pool, Func func)
19 {
20 size_t i = 0;
21 for (auto & component : pool) {
22 func(component, i++);
23 }
24 }
25 }
26
27 namespace Parallel
28 {
29 struct TaskScope
30 {
31 TaskScope(TaskManager * taskManager, TaskId group) : taskManager(taskManager), group(group) {}
32 TaskScope(TaskScope && other) = default;
33 ~TaskScope() { if (taskManager && group.isValid()) taskManager->destroy(group); }
34 void Wait() { taskManager->wait(group); }
35 private:
36 TaskManager * taskManager = nullptr;
37 TaskId group = NoTask;
38 };
39
40 template<typename Func>
41 TaskScope forEach(Context * context, size_t count, Func f, const char * scopeTag = "")
42 {
44 const size_t hwc = context->taskManager->getQueueConcurrency(queue);
45 if (count <= 1 || hwc < 1) {
46 if (count) {
47 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "forEach", scopeTag);
48 for (size_t j = 0; j < count; ++j) {
49 f(j);
50 }
51 }
52 return TaskScope{ context->taskManager.get(), NoTask };
53 }
54
55 const size_t perTask = (count / hwc) + 1;
56
57 auto group = context->taskManager->createGroup(queue);
58
59 for (size_t i = 0; i < hwc; ++i) {
60 const size_t begin = i * perTask;
61 const size_t end = std::min(begin + perTask, count);
62
63 if (begin >= count) continue;
64
65 context->taskManager->enqueueChild(group, [=]()
66 {
67 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "forEach", scopeTag);
68
69 for (size_t j = begin; j < end; ++j) {
70 f(j);
71 }
72 });
73 }
74
75 return TaskScope{ context->taskManager.get(), group };
76 }
77
78 template<typename Func>
79 TaskScope forEachRanged(Context* context, size_t count, Func f, size_t minSize = 0, const char* scopeTag = "")
80 {
82 const size_t hwc = context->taskManager->getQueueConcurrency(queue);
83 if (count < minSize || hwc < 1) {
84 if (count) {
85 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "forEachRanged", scopeTag);
86 f(0, count);
87 }
88 return TaskScope{ context->taskManager.get(), NoTask };
89 }
90 const size_t perTask = std::max(minSize, (count / hwc) + 1);
91
92 TaskId group = context->taskManager->createGroup(queue);
93
94 for (size_t i = 0; i < hwc; ++i) {
95 const size_t begin = i * perTask;
96 const size_t end = std::min(begin + perTask, count);
97
98 if (begin >= count) continue;
99
100 context->taskManager->enqueueChild(group, [=]()
101 {
102 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "forEachRanged", scopeTag);
103 f(begin, end);
104 });
105 }
106
107 return TaskScope{ context->taskManager.get(), group };
108 }
109
110 template<typename Pool, typename Func>
111 TaskId processComponents(Context * context, Pool & pool, const char * scopeName, Func func, TaskId taskGroup = NoTask)
112 {
113 const size_t numComponents = pool.size();
114 TaskQueueId queue = taskGroup.isValid() ? taskGroup.queueId : TaskManager::GlobalQueue;
115 const size_t hwc = context->taskManager->getQueueConcurrency(queue);
116 if (numComponents <= 1 || hwc < 1) {
117 if (numComponents) {
118 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "processComponents", scopeName);
119 for (size_t j = 0; j < numComponents; j++) {
120 func(pool[static_cast<SizeType>(j)], j);
121 }
122 }
123 return NoTask;
124 }
125
126 if (!taskGroup.isValid()) {
127 taskGroup = context->taskManager->createGroup(queue);
128 }
129 const size_t perTask = (numComponents / hwc) + 1;
130 for (size_t i = 0; i < hwc; ++i) {
131 const size_t begin = i * perTask;
132 const size_t end = std::min(begin + perTask, numComponents);
133
134 if (begin >= numComponents) continue;
135
136 context->taskManager->enqueueChild(taskGroup, [&pool, scopeName, func, begin, end]()
137 {
138 DynamicCpuInstrumentationScope(SCOPE_PARALLEL, "processComponents", scopeName);
139 for (size_t j = begin; j < end; ++j) {
140 func(pool[static_cast<SizeType>(j)], j);
141 }
142 });
143 }
144 return taskGroup;
145 }
146
147 template<typename Pool, typename Func>
148 TaskId processComponents(Context * context, Pool & pool, Func func, TaskId taskGroup = NoTask)
149 {
150 return processComponents(context, pool, "Parallel::processComponents", func, taskGroup);
151 }
152 }
153 }
154}
A Context instance contains all the services, systems and runtime components needed to use Cogs.
Definition: Context.h:83
std::unique_ptr< class TaskManager > taskManager
TaskManager service instance.
Definition: Context.h:186
Manages Task queuing and execution.
Definition: TaskManager.h:61
void wait(const TaskId &taskId)
Wait for the task given by taskId.
static constexpr TaskQueueId GlobalQueue
Global task queue.
Definition: TaskManager.h:224
void destroy(const TaskId &taskId)
Destroy the task given by taskId.
uint16_t TaskQueueId
Unique id for a task queue.
Definition: TaskManager.h:14
Contains all Cogs related functionality.
Definition: FieldSetter.h:23
ComponentIndex SizeType
Type used to track the size of pools.
Definition: Component.h:19
Task id struct used to identify unique Task instances.
Definition: TaskManager.h:20
bool isValid() const
Check if the task id is valid.
Definition: TaskManager.h:29