Project
Loading...
Searching...
No Matches
CompletionPolicyHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/InputSpan.h"
17#include "Framework/Logger.h"
20#include "DecongestionService.h"
21#include "Framework/Signpost.h"
22#if __has_include(<fairmq/shmem/Message.h>)
23#include <fairmq/shmem/Message.h>
24#endif
25
26#include <cassert>
27#include <regex>
28
30
31namespace o2::framework
32{
33
35{
36 auto matcher = [name](DeviceSpec const& device) -> bool {
37 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
38 };
39
40 auto originReceived = std::make_shared<std::vector<uint64_t>>();
41
42 auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
43 // update list of the start times of inputs with origin @origin
44 for (auto& ref : inputRefs) {
45 if (ref.header != nullptr) {
46 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
47 if (header->dataOrigin.str == origin) {
48 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
49 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
50 if (it == originReceived->end()) {
51 originReceived->emplace_back(startTime);
52 }
53 }
54 }
55 }
56
57 // find out if all inputs which are not of origin @origin have a corresponding entry in originReceived
58 // if one is missing then we have to wait
59 for (auto& ref : inputRefs) {
60 if (ref.header != nullptr) {
61 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
62 if (header->dataOrigin.str != origin) {
63 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
64 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
65 if (it == originReceived->end()) {
66 LOGP(info, "Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
68 }
69 }
70 }
71 }
72 return op;
73 };
74 return CompletionPolicy{"wait-origin", matcher, callback};
75
77}
78
80{
81 auto matcher = [name](DeviceSpec const& device) -> bool {
82 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
83 };
84 auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
85 return op;
86 };
87 switch (op) {
89 return consumeWhenAny(name.c_str(), matcher);
90 break;
92 return CompletionPolicy{"consume-existing", matcher, callback};
93 break;
95 return CompletionPolicy{"always-process", matcher, callback};
96 break;
98 return CompletionPolicy{"always-wait", matcher, callback};
99 break;
101 return CompletionPolicy{"always-discard", matcher, callback, false};
102 break;
104 return CompletionPolicy{"always-rescan", matcher, callback};
105 break;
107 return CompletionPolicy{"retry", matcher, callback};
108 break;
109 }
111}
112
114{
115 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
116 assert(inputs.size() == specs.size());
117 O2_SIGNPOST_ID_GENERATE(sid, completion);
118 O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");
119
120 size_t si = 0;
121 int sporadicCount = 0;
122 int timeframeCount = 0;
123 int missingSporadicCount = 0;
124 bool needsProcessing = false;
125 size_t currentTimeslice = -1;
126 for (auto& input : inputs) {
127 assert(si < specs.size());
128 auto& spec = specs[si++];
129 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
130 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
131 // If we are missing something which is not sporadic, we wait.
132 if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
133 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
135 }
136 // If we are missing something which is sporadic, we wait until we are sure it will not come.
137 if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
138 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
139 missingSporadicCount += 1;
140 }
141 // If we have a header, we use it to determine the current timesliceIsTimer
142 // (unless this is a timer which does not enter the oldest possible timeslice).
143 if (input.header != nullptr && currentTimeslice == -1) {
144 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
145 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
146 currentTimeslice = dph->startTime;
147 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
148 }
149 }
150 // If we have a header, we need to process it if it is not a condition object.
151 if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
152 needsProcessing = true;
153 }
154 }
155 // If some sporadic inputs are missing, we wait for them util we are sure they will not come,
156 // i.e. until the oldest possible timeslice is beyond the timeslice of the input.
157 auto& timesliceIndex = ref.get<TimesliceIndex>();
158 auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
159
160 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
161 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
163 }
164
165 // No need to process if we have only sporadic inputs and they are all missing.
166 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
167 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", "Discard", currentTimeslice);
169 }
170 auto consumes = (needsProcessing || sporadicCount == 0);
171 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", consumes ? "Consume" : "Discard", currentTimeslice);
173 };
174 return CompletionPolicy{name, matcher, callback};
175}
176
178{
179 auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
180 auto& decongestionService = ref.get<DecongestionService>();
181 decongestionService.orderedCompletionPolicyActive = true;
182 for (auto& input : inputs) {
183 if (input.header == nullptr) {
185 }
186 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
187 if (startTime == 0) {
188 LOGP(debug, "startTime is 0, which means we have the first message, so we can process it.");
189 decongestionService.nextTimeslice = 0;
190 }
191 if (framework::DataRefUtils::isValid(input) && startTime != decongestionService.nextTimeslice) {
193 }
194 }
195 decongestionService.nextTimeslice++;
197 };
198 return CompletionPolicy{name, matcher, callbackFull};
199}
200
202{
203 auto matcher = [matchName](DeviceSpec const& device) -> bool {
204 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
205 };
206 return consumeWhenAllOrdered(matcher);
207}
208
210{
211 return CompletionPolicy{
212 name,
213 matcher,
214 [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
215 size_t present = 0;
216 size_t current = 0;
217 size_t withPayload = 0;
218 size_t sporadic = 0;
219 size_t maxSporadic = 0;
220 size_t i = 0;
221 for (auto& input : inputs) {
222 auto& spec = specs[i++];
223 if (spec.lifetime == Lifetime::Sporadic) {
224 maxSporadic++;
225 }
226 if (input.header != nullptr) {
227 present++;
228 if (spec.lifetime == Lifetime::Sporadic) {
229 sporadic++;
230 }
231 }
232 if (input.payload != nullptr) {
233 withPayload++;
234 }
235 current++;
236 }
237 // * In case we have all inputs but the sporadic ones: Consume, since we do not know if the sporadic ones
238 // will ever come.
239 // * In case we have only sporadic inputs: Consume, since we do not know if we already Consumed
240 // the non sporadic ones above.
241 // * In case we do not have payloads: Wait
242 // * In all other cases we consume what is there, but we wait for the non sporadic ones to be complete
243 // (i.e. we wait for present + maxSporadic).
244 if (present - sporadic + maxSporadic == inputs.size()) {
246 } else if (present - sporadic == 0) {
248 } else if (withPayload == 0) {
250 }
252 }};
253}
254
255#if __has_include(<fairmq/shmem/Message.h>)
256CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* name, CompletionPolicy::Matcher matcher)
257{
258 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
259 for (size_t i = 0; i < inputs.size(); ++i) {
260 if (inputs.get(i).header != nullptr && inputs.getRefCount(i) == 1) {
262 }
263 }
265 };
266 return CompletionPolicy{name, matcher, callback, false};
267}
268#endif
269
271{
272 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
273 for (auto& input : inputs) {
274 if (input.header != nullptr) {
276 }
277 }
279 };
280 return CompletionPolicy{name, matcher, callback, false};
281}
282
284{
285 auto matcher = [matchName](DeviceSpec const& device) -> bool {
286 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
287 };
288 return consumeWhenAny(matcher);
289}
290
292{
293 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
294 bool canConsume = false;
295 bool hasConditions = false;
296 bool conditionMissing = false;
297 size_t timeslice = -1;
298 static size_t timesliceOK = -1; // FIXME: This breaks start/stop/start, since it must be reset!
299 // FIXME: Also, this just checks the max timeslice that was already consumed.
300 // In case timeslices do not come in order, we might have consumed a later
301 // condition object, but not the one for the current time slice.
302 // But I don't see any possibility to handle this in a better way.
303
304 // Iterate on all specs and all inputs simultaneously
305 for (size_t i = 0; i < inputs.size(); ++i) {
306 char const* header = inputs.header(i);
307 auto& spec = specs[i];
308 // In case a condition object is not there, we need to wait.
309 if (header != nullptr) {
310 canConsume = true;
311 }
312 if (spec.lifetime == Lifetime::Condition) {
313 hasConditions = true;
314 if (header == nullptr) {
315 conditionMissing = true;
316 }
317 }
318 }
319 if (canConsume || conditionMissing) {
320 for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) {
321 for (auto const& ref : it) {
323 continue;
324 }
325 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(ref);
326 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
327 timeslice = dph->startTime;
328 break;
329 }
330 }
331 if (timeslice != -1) {
332 break;
333 }
334 }
335 }
336
337 // If there are no conditions, just consume.
338 if (!hasConditions) {
339 canConsume = true;
340 } else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
342 }
343
344 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
345 timesliceOK = timeslice;
346 }
348 };
349 return CompletionPolicy{name, matcher, callback, false};
350}
351
353{
354 auto matcher = [matchName](DeviceSpec const& device) -> bool {
355 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
356 };
357 return consumeWhenAnyWithAllConditions(matcher);
358}
359
361{
362 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
363 size_t present = 0;
364 for (auto& input : inputs) {
365 if (input.header != nullptr) {
366 present++;
367 }
368 }
369 if (present == inputs.size()) {
371 } else if (present == 0) {
373 }
375 };
376 return CompletionPolicy{name, matcher, callback};
377}
378
379} // namespace o2::framework
#define O2_BUILTIN_UNREACHABLE
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
std::ostringstream debug
const char * header(size_t i) const
Definition InputSpan.h:87
const_iterator end() const
Definition InputSpan.h:243
const_iterator begin() const
Definition InputSpan.h:237
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan
Definition InputSpan.h:52
int getRefCount(size_t i) const
Definition InputSpan.h:70
OldestInputInfo getOldestPossibleInput() const
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
const char * header
Definition DataRef.h:27
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46