70 if (queue.tasks.empty()) {
74 O2_SIGNPOST_START(async_queue, opid,
"run",
"Attempting at running %zu tasks with oldestPossible timeframe %zu", queue.tasks.size(), oldestPossible.value);
75 std::vector<int> order;
76 order.resize(queue.tasks.size());
77 std::iota(order.begin(), order.end(), 0);
79 for (
auto& task : queue.tasks) {
80 if (task.timeslice.value <= oldestPossible.value) {
84 "Task %d (timeslice %zu), score %d, debounce %d is %{public}s when oldestPossible timeframe is %zu",
85 task.id.value, task.timeslice.value, queue.prototypes[task.id.value].score, task.debounce,
86 task.runnable ?
"runnable" :
"not runnable", oldestPossible.value);
90 std::sort(order.begin(), order.end(), [&queue](
int a,
int b) {
91 if (queue.tasks[a].runnable && !queue.tasks[b].runnable) {
94 if (!queue.tasks[
a].runnable && queue.tasks[
b].runnable) {
97 if (queue.tasks[
a].timeslice.value == queue.tasks[
b].timeslice.value) {
98 if (queue.tasks[a].id.value == -1 || queue.tasks[b].id.value == -1) {
101 if (queue.tasks[
a].id.value == queue.tasks[
b].id.value) {
102 return queue.tasks[a].debounce > queue.tasks[b].debounce;
104 return queue.prototypes[queue.tasks[
a].id.value].score > queue.prototypes[queue.tasks[
b].id.value].score;
106 return queue.tasks[
a].timeslice.value > queue.tasks[
b].timeslice.value;
110 for (
auto i : order) {
111 if (queue.tasks[
i].runnable) {
112 O2_SIGNPOST_EVENT_EMIT(async_queue, opid,
"run",
"Running task %d (%d), (timeslice %zu), score %d, debounce %d", queue.tasks[
i].id.value,
i, queue.tasks[
i].timeslice.value, queue.prototypes[queue.tasks[
i].id.value].score, queue.tasks[
i].debounce);
114 O2_SIGNPOST_EVENT_EMIT(async_queue, opid,
"run",
"Skipping task %d (%d) (timeslice %zu), score %d, debounce %d", queue.tasks[
i].id.value,
i, queue.tasks[
i].timeslice.value, queue.prototypes[queue.tasks[
i].id.value].score, queue.tasks[
i].debounce);
120 auto newEnd = std::unique(order.begin(), order.end(), [&queue](
int a,
int b) {
121 return queue.tasks[a].runnable == queue.tasks[b].runnable && queue.tasks[a].id.value == queue.tasks[b].id.value && queue.tasks[a].debounce >= 0 && queue.tasks[b].debounce >= 0;
123 for (
auto ii = newEnd; ii != order.end(); ii++) {
124 O2_SIGNPOST_EVENT_EMIT(async_queue, opid,
"dropping",
"Dropping task %d for timeslice %zu", queue.tasks[*ii].id.value, queue.tasks[*ii].timeslice.value);
126 order.erase(newEnd, order.end());
128 if (order.empty() && queue.tasks.size() > 0) {
129 O2_SIGNPOST_END(async_queue, opid,
"run",
"Not running iteration %zu pending %zu.",
130 queue.iteration, queue.tasks.size());
132 }
else if (order.empty()) {
133 O2_SIGNPOST_END(async_queue, opid,
"run",
"Not running iteration %zu. No tasks.", queue.iteration);
136 O2_SIGNPOST_EVENT_EMIT(async_queue, opid,
"run",
"Running %zu tasks in iteration %zu", order.size(), queue.iteration);
139 for (
auto i : order) {
140 if (queue.tasks[
i].runnable) {
144 queue.prototypes[queue.tasks[
i].id.value].name.c_str(),
i,
145 queue.tasks[
i].timeslice.value);
146 queue.tasks[
i].callback(queue.tasks[
i], opid.value);
152 queue.tasks.erase(std::remove_if(queue.tasks.begin(), queue.tasks.end(), [&queue](AsyncTask
const& task) {
153 return task.runnable;
156 O2_SIGNPOST_END(async_queue, opid,
"run",
"Done running %d/%zu tasks", runCount, order.size());
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)