Project
Loading...
Searching...
No Matches
AsyncQueue.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/Signpost.h"
14#include "x9.h"
15#include <numeric>
16
18
19namespace o2::framework
20{
22 : inbox(x9_create_inbox(16, "async_queue", sizeof(AsyncTask)))
23{
24 this->inbox = x9_create_inbox(16, "async_queue", sizeof(AsyncTask));
25}
26
28{
30 id.value = queue.prototypes.size();
31 queue.prototypes.push_back(spec);
32 return id;
33}
34
35auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTask const& task) -> void
36{
37 // Until we do not manage to write to the inbox, keep removing
38 // items from the queue if you are the first one which fails to
39 // write.
40 while (!x9_write_to_inbox(queue.inbox, sizeof(AsyncTask), &task)) {
42 }
43}
44
46{
47 bool isFirst = true;
48 if (!std::atomic_compare_exchange_strong(&queue.first, &isFirst, false)) {
49 // Not the first, try again.
50 return;
51 }
52 // First thread which does not manage to write to the queue.
53 // Flush it a bit before we try again.
54 AsyncTask toFlush;
55 // This potentially stalls if the inserting tasks are faster to insert
56 // than we are to retrieve. We should probably have a cut-off
57 while (x9_read_from_inbox(queue.inbox, sizeof(AsyncTask), &toFlush)) {
58 queue.tasks.push_back(toFlush);
59 }
60 queue.first = true;
61}
62
63auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void
64{
65 // We synchronize right before we run to get as many
66 // tasks as possible. Notice we might still miss some
67 // which will have to handled on a subsequent iteration.
69
70 if (queue.tasks.empty()) {
71 return;
72 }
73 O2_SIGNPOST_ID_GENERATE(opid, async_queue);
74 O2_SIGNPOST_START(async_queue, opid, "run", "Attempting at running %zu tasks with oldestPossible timeframe %zu", queue.tasks.size(), oldestPossible.value);
75 std::vector<int> order;
76 order.resize(queue.tasks.size());
77 std::iota(order.begin(), order.end(), 0);
78 // Decide wether or not they can run as a first thing
79 for (auto& task : queue.tasks) {
80 if (task.timeslice.value <= oldestPossible.value) {
81 task.runnable = true;
82 }
83 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run",
84 "Task %d (timeslice %zu), score %d, debounce %d is %{public}s when oldestPossible timeframe is %zu",
85 task.id.value, task.timeslice.value, queue.prototypes[task.id.value].score, task.debounce,
86 task.runnable ? "runnable" : "not runnable", oldestPossible.value);
87 }
88
89 // Sort by runnable, timeslice, then priority and finally debounce
90 std::sort(order.begin(), order.end(), [&queue](int a, int b) {
91 if (queue.tasks[a].runnable && !queue.tasks[b].runnable) {
92 return true;
93 }
94 if (!queue.tasks[a].runnable && queue.tasks[b].runnable) {
95 return false;
96 }
97 if (queue.tasks[a].timeslice.value == queue.tasks[b].timeslice.value) {
98 if (queue.tasks[a].id.value == -1 || queue.tasks[b].id.value == -1) {
99 return false;
100 }
101 if (queue.tasks[a].id.value == queue.tasks[b].id.value) {
102 return queue.tasks[a].debounce > queue.tasks[b].debounce;
103 }
104 return queue.prototypes[queue.tasks[a].id.value].score > queue.prototypes[queue.tasks[b].id.value].score;
105 } else {
106 return queue.tasks[a].timeslice.value > queue.tasks[b].timeslice.value;
107 }
108 });
109
110 for (auto i : order) {
111 if (queue.tasks[i].runnable) {
112 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run", "Running task %d (%d), (timeslice %zu), score %d, debounce %d", queue.tasks[i].id.value, i, queue.tasks[i].timeslice.value, queue.prototypes[queue.tasks[i].id.value].score, queue.tasks[i].debounce);
113 } else {
114 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run", "Skipping task %d (%d) (timeslice %zu), score %d, debounce %d", queue.tasks[i].id.value, i, queue.tasks[i].timeslice.value, queue.prototypes[queue.tasks[i].id.value].score, queue.tasks[i].debounce);
115 }
116 }
117 // Keep only the tasks with the highest debounce value for a given id
118 // For this reason I need to keep the callback in the task itself, because
119 // two different callbacks with the same id will be coalesced.
120 auto newEnd = std::unique(order.begin(), order.end(), [&queue](int a, int b) {
121 return queue.tasks[a].runnable == queue.tasks[b].runnable && queue.tasks[a].id.value == queue.tasks[b].id.value && queue.tasks[a].debounce >= 0 && queue.tasks[b].debounce >= 0;
122 });
123 for (auto ii = newEnd; ii != order.end(); ii++) {
124 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "dropping", "Dropping task %d for timeslice %zu", queue.tasks[*ii].id.value, queue.tasks[*ii].timeslice.value);
125 }
126 order.erase(newEnd, order.end());
127
128 if (order.empty() && queue.tasks.size() > 0) {
129 O2_SIGNPOST_END(async_queue, opid, "run", "Not running iteration %zu pending %zu.",
130 queue.iteration, queue.tasks.size());
131 return;
132 } else if (order.empty()) {
133 O2_SIGNPOST_END(async_queue, opid, "run", "Not running iteration %zu. No tasks.", queue.iteration);
134 return;
135 }
136 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run", "Running %zu tasks in iteration %zu", order.size(), queue.iteration);
137
138 int runCount = 0;
139 for (auto i : order) {
140 if (queue.tasks[i].runnable) {
141 runCount++;
142 // If a task is runable, we can run the task and remove it from the queue
143 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run", "Running task %{public}s (%d) for timeslice %zu",
144 queue.prototypes[queue.tasks[i].id.value].name.c_str(), i,
145 queue.tasks[i].timeslice.value);
146 queue.tasks[i].callback(queue.tasks[i], opid.value);
147 O2_SIGNPOST_EVENT_EMIT(async_queue, opid, "run", "Done running %d", i);
148 }
149 }
150 // Remove all runnable tasks regardless they actually
151 // ran or they were skipped due to debouncing.
152 queue.tasks.erase(std::remove_if(queue.tasks.begin(), queue.tasks.end(), [&queue](AsyncTask const& task) {
153 return task.runnable;
154 }),
155 queue.tasks.end());
156 O2_SIGNPOST_END(async_queue, opid, "run", "Done running %d/%zu tasks", runCount, order.size());
157}
158
160{
161 queue.tasks.clear();
162 queue.iteration = 0;
163}
164
165} // namespace o2::framework
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static void flushPending(AsyncQueue &queue)
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32