Project
Loading...
Searching...
No Matches
CompletionPolicyHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15#include "Framework/InputSpan.h"
18#include "Framework/Logger.h"
21#include "DecongestionService.h"
22#include "Framework/Signpost.h"
23#include <fairmq/shmem/Message.h>
24
25#include <cassert>
26#include <regex>
27
29
30namespace o2::framework
31{
32
34{
35 auto matcher = [name](DeviceSpec const& device) -> bool {
36 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
37 };
38
39 auto originReceived = std::make_shared<std::vector<uint64_t>>();
40
41 auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
42 // update list of the start times of inputs with origin @origin
43 for (auto& ref : inputRefs) {
44 if (ref.header != nullptr) {
45 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
46 if (header->dataOrigin.str == origin) {
47 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
48 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
49 if (it == originReceived->end()) {
50 originReceived->emplace_back(startTime);
51 }
52 }
53 }
54 }
55
56 // find out if all inputs which are not of origin @origin have a corresponding entry in originReceived
57 // if one is missing then we have to wait
58 for (auto& ref : inputRefs) {
59 if (ref.header != nullptr) {
60 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
61 if (header->dataOrigin.str != origin) {
62 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
63 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
64 if (it == originReceived->end()) {
65 LOGP(info, "Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
67 }
68 }
69 }
70 }
71 return op;
72 };
73 return CompletionPolicy{"wait-origin", matcher, callback};
74
76}
77
79{
80 auto matcher = [name](DeviceSpec const& device) -> bool {
81 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
82 };
83 auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
84 return op;
85 };
86 switch (op) {
88 return consumeWhenAny(name.c_str(), matcher);
89 break;
91 return CompletionPolicy{"consume-existing", matcher, callback};
92 break;
94 return CompletionPolicy{"always-process", matcher, callback};
95 break;
97 return CompletionPolicy{"always-wait", matcher, callback};
98 break;
100 return CompletionPolicy{"always-discard", matcher, callback, false};
101 break;
103 return CompletionPolicy{"always-rescan", matcher, callback};
104 break;
106 return CompletionPolicy{"retry", matcher, callback};
107 break;
108 }
110}
111
113{
114 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
115 assert(inputs.size() == specs.size());
116 O2_SIGNPOST_ID_GENERATE(sid, completion);
117 O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");
118
119 size_t si = 0;
120 int sporadicCount = 0;
121 int timeframeCount = 0;
122 int missingSporadicCount = 0;
123 bool needsProcessing = false;
124 size_t currentTimeslice = -1;
125 for (auto& input : inputs) {
126 assert(si < specs.size());
127 auto& spec = specs[si++];
128 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
129 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
130 // If we are missing something which is not sporadic, we wait.
131 if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
132 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
134 }
135 // If we are missing something which is sporadic, we wait until we are sure it will not come.
136 if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
137 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
138 missingSporadicCount += 1;
139 }
140 // If we have a header, we use it to determine the current timesliceIsTimer
141 // (unless this is a timer which does not enter the oldest possible timeslice).
142 if (input.header != nullptr && currentTimeslice == -1) {
143 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
144 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
145 currentTimeslice = dph->startTime;
146 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
147 }
148 }
149 // If we have a header, we need to process it if it is not a condition object.
150 if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
151 needsProcessing = true;
152 }
153 }
154 // If some sporadic inputs are missing, we wait for them util we are sure they will not come,
155 // i.e. until the oldest possible timeslice is beyond the timeslice of the input.
156 auto& timesliceIndex = ref.get<TimesliceIndex>();
157 auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
158
159 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
160 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
162 }
163
164 // No need to process if we have only sporadic inputs and they are all missing.
165 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
166 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", "Discard", currentTimeslice);
168 }
169 auto consumes = (needsProcessing || sporadicCount == 0);
170 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", consumes ? "Consume" : "Discard", currentTimeslice);
172 };
173 return CompletionPolicy{name, matcher, callback};
174}
175
177{
178 auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
179 auto& decongestionService = ref.get<DecongestionService>();
180 decongestionService.orderedCompletionPolicyActive = true;
181 for (auto& input : inputs) {
182 if (input.header == nullptr) {
184 }
185 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
186 if (startTime == 0) {
187 LOGP(debug, "startTime is 0, which means we have the first message, so we can process it.");
188 decongestionService.nextTimeslice = 0;
189 }
190 if (framework::DataRefUtils::isValid(input) && startTime != decongestionService.nextTimeslice) {
192 }
193 }
194 decongestionService.nextTimeslice++;
196 };
197 return CompletionPolicy{name, matcher, callbackFull};
198}
199
201{
202 auto matcher = [matchName](DeviceSpec const& device) -> bool {
203 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
204 };
205 return consumeWhenAllOrdered(matcher);
206}
207
209{
210 return CompletionPolicy{
211 name,
212 matcher,
213 [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
214 size_t present = 0;
215 size_t current = 0;
216 size_t withPayload = 0;
217 size_t sporadic = 0;
218 size_t maxSporadic = 0;
219 size_t i = 0;
220 for (auto& input : inputs) {
221 auto& spec = specs[i++];
222 if (spec.lifetime == Lifetime::Sporadic) {
223 maxSporadic++;
224 }
225 if (input.header != nullptr) {
226 present++;
227 if (spec.lifetime == Lifetime::Sporadic) {
228 sporadic++;
229 }
230 }
231 if (input.payload != nullptr) {
232 withPayload++;
233 }
234 current++;
235 }
236 // * In case we have all inputs but the sporadic ones: Consume, since we do not know if the sporadic ones
237 // will ever come.
238 // * In case we have only sporadic inputs: Consume, since we do not know if we already Consumed
239 // the non sporadic ones above.
240 // * In case we do not have payloads: Wait
241 // * In all other cases we consume what is there, but we wait for the non sporadic ones to be complete
242 // (i.e. we wait for present + maxSporadic).
243 if (present - sporadic + maxSporadic == inputs.size()) {
245 } else if (present - sporadic == 0) {
247 } else if (withPayload == 0) {
249 }
251 }};
252}
253
255{
256 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
257 for (size_t i = 0; i < inputs.size(); ++i) {
258 if (inputs.get(i).header != nullptr && inputs.getRefCount(i) == 1) {
260 }
261 }
263 };
264 return CompletionPolicy{name, matcher, callback, false};
265}
266
268{
269 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
270 size_t currentTimeslice = -1;
271 for (auto& input : inputs) {
272 if (input.header == nullptr) {
273 continue;
274 }
275 o2::framework::DataProcessingHeader const* dph = o2::header::get<o2::framework::DataProcessingHeader*>(input.header);
276 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
277 currentTimeslice = dph->startTime;
278 break;
279 }
280 }
281
282 auto& timesliceIndex = ref.get<TimesliceIndex>();
283 auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
284
285 if (currentTimeslice >= oldestPossibleTimeslice) {
287 }
289 };
290 return CompletionPolicy{name, matcher, callback, false};
291}
292
294{
295 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
296 for (auto& input : inputs) {
297 if (input.header != nullptr) {
299 }
300 }
302 };
303 return CompletionPolicy{name, matcher, callback, false};
304}
305
307{
308 auto matcher = [matchName](DeviceSpec const& device) -> bool {
309 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
310 };
311 return consumeWhenAny(matcher);
312}
313
315{
316 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
317 bool canConsume = false;
318 bool hasConditions = false;
319 bool conditionMissing = false;
320 size_t timeslice = -1;
321 static size_t timesliceOK = -1; // FIXME: This breaks start/stop/start, since it must be reset!
322 // FIXME: Also, this just checks the max timeslice that was already consumed.
323 // In case timeslices do not come in order, we might have consumed a later
324 // condition object, but not the one for the current time slice.
325 // But I don't see any possibility to handle this in a better way.
326
327 // Iterate on all specs and all inputs simultaneously
328 for (size_t i = 0; i < inputs.size(); ++i) {
329 char const* header = inputs.header(i);
330 auto& spec = specs[i];
331 // In case a condition object is not there, we need to wait.
332 if (header != nullptr) {
333 canConsume = true;
334 }
335 if (spec.lifetime == Lifetime::Condition) {
336 hasConditions = true;
337 if (header == nullptr) {
338 conditionMissing = true;
339 }
340 }
341 }
342 if (canConsume || conditionMissing) {
343 for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) {
344 for (auto const& ref : it) {
346 continue;
347 }
348 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(ref);
349 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
350 timeslice = dph->startTime;
351 break;
352 }
353 }
354 if (timeslice != -1) {
355 break;
356 }
357 }
358 }
359
360 // If there are no conditions, just consume.
361 if (!hasConditions) {
362 canConsume = true;
363 } else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
365 }
366
367 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
368 timesliceOK = timeslice;
369 }
371 };
372 return CompletionPolicy{name, matcher, callback, false};
373}
374
376{
377 auto matcher = [matchName](DeviceSpec const& device) -> bool {
378 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
379 };
380 return consumeWhenAnyWithAllConditions(matcher);
381}
382
384{
385 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
386 size_t present = 0;
387 for (auto& input : inputs) {
388 if (input.header != nullptr) {
389 present++;
390 }
391 }
392 if (present == inputs.size()) {
394 } else if (present == 0) {
396 }
398 };
399 return CompletionPolicy{name, matcher, callback};
400}
401
402} // namespace o2::framework
header::DataOrigin origin
#define O2_BUILTIN_UNREACHABLE
std::ostringstream debug
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
const char * header(size_t i) const
Definition InputSpan.h:87
const_iterator end() const
Definition InputSpan.h:243
const_iterator begin() const
Definition InputSpan.h:237
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan
Definition InputSpan.h:52
int getRefCount(size_t i) const
Definition InputSpan.h:70
OldestInputInfo getOldestPossibleInput() const
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy consumeWhenPastOldestPossibleTimeframe(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy consumeWhenAnyZeroCount(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts which has arrived has a refcount of 1.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
const char * header
Definition DataRef.h:27
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46