Project
Loading...
Searching...
No Matches
CompletionPolicyHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/InputSpan.h"
17#include "Framework/Logger.h"
20#include "DecongestionService.h"
21#include "Framework/Signpost.h"
22#include <fairmq/shmem/Message.h>
23
24#include <cassert>
25#include <regex>
26
28
29namespace o2::framework
30{
31
33{
34 auto matcher = [name](DeviceSpec const& device) -> bool {
35 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
36 };
37
38 auto originReceived = std::make_shared<std::vector<uint64_t>>();
39
40 auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
41 // update list of the start times of inputs with origin @origin
42 for (auto& ref : inputRefs) {
43 if (ref.header != nullptr) {
44 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
45 if (header->dataOrigin.str == origin) {
46 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
47 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
48 if (it == originReceived->end()) {
49 originReceived->emplace_back(startTime);
50 }
51 }
52 }
53 }
54
55 // find out if all inputs which are not of origin @origin have a corresponding entry in originReceived
56 // if one is missing then we have to wait
57 for (auto& ref : inputRefs) {
58 if (ref.header != nullptr) {
59 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
60 if (header->dataOrigin.str != origin) {
61 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
62 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
63 if (it == originReceived->end()) {
64 LOGP(info, "Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
66 }
67 }
68 }
69 }
70 return op;
71 };
72 return CompletionPolicy{"wait-origin", matcher, callback};
73
75}
76
78{
79 auto matcher = [name](DeviceSpec const& device) -> bool {
80 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
81 };
82 auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
83 return op;
84 };
85 switch (op) {
87 return consumeWhenAny(name.c_str(), matcher);
88 break;
90 return CompletionPolicy{"consume-existing", matcher, callback};
91 break;
93 return CompletionPolicy{"always-process", matcher, callback};
94 break;
96 return CompletionPolicy{"always-wait", matcher, callback};
97 break;
99 return CompletionPolicy{"always-discard", matcher, callback, false};
100 break;
102 return CompletionPolicy{"always-rescan", matcher, callback};
103 break;
105 return CompletionPolicy{"retry", matcher, callback};
106 break;
107 }
109}
110
112{
113 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
114 assert(inputs.size() == specs.size());
115 O2_SIGNPOST_ID_GENERATE(sid, completion);
116 O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");
117
118 size_t si = 0;
119 int sporadicCount = 0;
120 int timeframeCount = 0;
121 int missingSporadicCount = 0;
122 bool needsProcessing = false;
123 size_t currentTimeslice = -1;
124 for (auto& input : inputs) {
125 assert(si < specs.size());
126 auto& spec = specs[si++];
127 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
128 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
129 // If we are missing something which is not sporadic, we wait.
130 if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
131 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
133 }
134 // If we are missing something which is sporadic, we wait until we are sure it will not come.
135 if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
136 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
137 missingSporadicCount += 1;
138 }
139 // If we have a header, we use it to determine the current timesliceIsTimer
140 // (unless this is a timer which does not enter the oldest possible timeslice).
141 if (input.header != nullptr && currentTimeslice == -1) {
142 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
143 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
144 currentTimeslice = dph->startTime;
145 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
146 }
147 }
148 // If we have a header, we need to process it if it is not a condition object.
149 if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
150 needsProcessing = true;
151 }
152 }
153 // If some sporadic inputs are missing, we wait for them util we are sure they will not come,
154 // i.e. until the oldest possible timeslice is beyond the timeslice of the input.
155 auto& timesliceIndex = ref.get<TimesliceIndex>();
156 auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
157
158 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
159 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
161 }
162
163 // No need to process if we have only sporadic inputs and they are all missing.
164 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
165 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", "Discard", currentTimeslice);
167 }
168 auto consumes = (needsProcessing || sporadicCount == 0);
169 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", consumes ? "Consume" : "Discard", currentTimeslice);
171 };
172 return CompletionPolicy{name, matcher, callback};
173}
174
176{
177 auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
178 auto& decongestionService = ref.get<DecongestionService>();
179 decongestionService.orderedCompletionPolicyActive = true;
180 for (auto& input : inputs) {
181 if (input.header == nullptr) {
183 }
184 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
185 if (startTime == 0) {
186 LOGP(debug, "startTime is 0, which means we have the first message, so we can process it.");
187 decongestionService.nextTimeslice = 0;
188 }
189 if (framework::DataRefUtils::isValid(input) && startTime != decongestionService.nextTimeslice) {
191 }
192 }
193 decongestionService.nextTimeslice++;
195 };
196 return CompletionPolicy{name, matcher, callbackFull};
197}
198
200{
201 auto matcher = [matchName](DeviceSpec const& device) -> bool {
202 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
203 };
204 return consumeWhenAllOrdered(matcher);
205}
206
208{
209 return CompletionPolicy{
210 name,
211 matcher,
212 [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
213 size_t present = 0;
214 size_t current = 0;
215 size_t withPayload = 0;
216 size_t sporadic = 0;
217 size_t maxSporadic = 0;
218 size_t i = 0;
219 for (auto& input : inputs) {
220 auto& spec = specs[i++];
221 if (spec.lifetime == Lifetime::Sporadic) {
222 maxSporadic++;
223 }
224 if (input.header != nullptr) {
225 present++;
226 if (spec.lifetime == Lifetime::Sporadic) {
227 sporadic++;
228 }
229 }
230 if (input.payload != nullptr) {
231 withPayload++;
232 }
233 current++;
234 }
235 // * In case we have all inputs but the sporadic ones: Consume, since we do not know if the sporadic ones
236 // will ever come.
237 // * In case we have only sporadic inputs: Consume, since we do not know if we already Consumed
238 // the non sporadic ones above.
239 // * In case we do not have payloads: Wait
240 // * In all other cases we consume what is there, but we wait for the non sporadic ones to be complete
241 // (i.e. we wait for present + maxSporadic).
242 if (present - sporadic + maxSporadic == inputs.size()) {
244 } else if (present - sporadic == 0) {
246 } else if (withPayload == 0) {
248 }
250 }};
251}
252
254{
255 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
256 for (size_t i = 0; i < inputs.size(); ++i) {
257 if (inputs.get(i).header != nullptr && inputs.getRefCount(i) == 1) {
259 }
260 }
262 };
263 return CompletionPolicy{name, matcher, callback, false};
264}
265
267{
268 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
269 for (auto& input : inputs) {
270 if (input.header != nullptr) {
272 }
273 }
275 };
276 return CompletionPolicy{name, matcher, callback, false};
277}
278
280{
281 auto matcher = [matchName](DeviceSpec const& device) -> bool {
282 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
283 };
284 return consumeWhenAny(matcher);
285}
286
288{
289 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
290 bool canConsume = false;
291 bool hasConditions = false;
292 bool conditionMissing = false;
293 size_t timeslice = -1;
294 static size_t timesliceOK = -1; // FIXME: This breaks start/stop/start, since it must be reset!
295 // FIXME: Also, this just checks the max timeslice that was already consumed.
296 // In case timeslices do not come in order, we might have consumed a later
297 // condition object, but not the one for the current time slice.
298 // But I don't see any possibility to handle this in a better way.
299
300 // Iterate on all specs and all inputs simultaneously
301 for (size_t i = 0; i < inputs.size(); ++i) {
302 char const* header = inputs.header(i);
303 auto& spec = specs[i];
304 // In case a condition object is not there, we need to wait.
305 if (header != nullptr) {
306 canConsume = true;
307 }
308 if (spec.lifetime == Lifetime::Condition) {
309 hasConditions = true;
310 if (header == nullptr) {
311 conditionMissing = true;
312 }
313 }
314 }
315 if (canConsume || conditionMissing) {
316 for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) {
317 for (auto const& ref : it) {
319 continue;
320 }
321 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(ref);
322 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
323 timeslice = dph->startTime;
324 break;
325 }
326 }
327 if (timeslice != -1) {
328 break;
329 }
330 }
331 }
332
333 // If there are no conditions, just consume.
334 if (!hasConditions) {
335 canConsume = true;
336 } else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
338 }
339
340 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
341 timesliceOK = timeslice;
342 }
344 };
345 return CompletionPolicy{name, matcher, callback, false};
346}
347
349{
350 auto matcher = [matchName](DeviceSpec const& device) -> bool {
351 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
352 };
353 return consumeWhenAnyWithAllConditions(matcher);
354}
355
357{
358 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
359 size_t present = 0;
360 for (auto& input : inputs) {
361 if (input.header != nullptr) {
362 present++;
363 }
364 }
365 if (present == inputs.size()) {
367 } else if (present == 0) {
369 }
371 };
372 return CompletionPolicy{name, matcher, callback};
373}
374
375} // namespace o2::framework
header::DataOrigin origin
#define O2_BUILTIN_UNREACHABLE
std::ostringstream debug
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
const char * header(size_t i) const
Definition InputSpan.h:87
const_iterator end() const
Definition InputSpan.h:243
const_iterator begin() const
Definition InputSpan.h:237
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan
Definition InputSpan.h:52
int getRefCount(size_t i) const
Definition InputSpan.h:70
OldestInputInfo getOldestPossibleInput() const
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy consumeWhenAnyZeroCount(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts which has arrived has a refcount of 1.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
const char * header
Definition DataRef.h:27
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46