Project
Loading...
Searching...
No Matches
CompletionPolicyHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/InputSpan.h"
17#include "Framework/Logger.h"
20#include "DecongestionService.h"
21#include "Framework/Signpost.h"
22
23#include <cassert>
24#include <regex>
25
27
28namespace o2::framework
29{
30
32{
33 auto matcher = [name](DeviceSpec const& device) -> bool {
34 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
35 };
36
37 auto originReceived = std::make_shared<std::vector<uint64_t>>();
38
39 auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
40 // update list of the start times of inputs with origin @origin
41 for (auto& ref : inputRefs) {
42 if (ref.header != nullptr) {
43 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
44 if (header->dataOrigin.str == origin) {
45 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
46 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
47 if (it == originReceived->end()) {
48 originReceived->emplace_back(startTime);
49 }
50 }
51 }
52 }
53
54 // find out if all inputs which are not of origin @origin have a corresponding entry in originReceived
55 // if one is missing then we have to wait
56 for (auto& ref : inputRefs) {
57 if (ref.header != nullptr) {
58 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
59 if (header->dataOrigin.str != origin) {
60 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
61 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
62 if (it == originReceived->end()) {
63 LOGP(info, "Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
65 }
66 }
67 }
68 }
69 return op;
70 };
71 return CompletionPolicy{"wait-origin", matcher, callback};
72
74}
75
77{
78 auto matcher = [name](DeviceSpec const& device) -> bool {
79 return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
80 };
81 auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
82 return op;
83 };
84 switch (op) {
86 return consumeWhenAny(name.c_str(), matcher);
87 break;
89 return CompletionPolicy{"consume-existing", matcher, callback};
90 break;
92 return CompletionPolicy{"always-process", matcher, callback};
93 break;
95 return CompletionPolicy{"always-wait", matcher, callback};
96 break;
98 return CompletionPolicy{"always-discard", matcher, callback, false};
99 break;
101 return CompletionPolicy{"always-rescan", matcher, callback};
102 break;
104 return CompletionPolicy{"retry", matcher, callback};
105 break;
106 }
108}
109
111{
112 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
113 assert(inputs.size() == specs.size());
114 O2_SIGNPOST_ID_GENERATE(sid, completion);
115 O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");
116
117 size_t si = 0;
118 int sporadicCount = 0;
119 int timeframeCount = 0;
120 int missingSporadicCount = 0;
121 bool needsProcessing = false;
122 size_t currentTimeslice = -1;
123 for (auto& input : inputs) {
124 assert(si < specs.size());
125 auto& spec = specs[si++];
126 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
127 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
128 // If we are missing something which is not sporadic, we wait.
129 if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
130 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
132 }
133 // If we are missing something which is sporadic, we wait until we are sure it will not come.
134 if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
135 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
136 missingSporadicCount += 1;
137 }
138 // If we have a header, we use it to determine the current timesliceIsTimer
139 // (unless this is a timer which does not enter the oldest possible timeslice).
140 if (input.header != nullptr && currentTimeslice == -1) {
141 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
142 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
143 currentTimeslice = dph->startTime;
144 O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
145 }
146 }
147 // If we have a header, we need to process it if it is not a condition object.
148 if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
149 needsProcessing = true;
150 }
151 }
152 // If some sporadic inputs are missing, we wait for them util we are sure they will not come,
153 // i.e. until the oldest possible timeslice is beyond the timeslice of the input.
154 auto& timesliceIndex = ref.get<TimesliceIndex>();
155 auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
156
157 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
158 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
160 }
161
162 // No need to process if we have only sporadic inputs and they are all missing.
163 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
164 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", "Discard", currentTimeslice);
166 }
167 auto consumes = (needsProcessing || sporadicCount == 0);
168 O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu", consumes ? "Consume" : "Discard", currentTimeslice);
170 };
171 return CompletionPolicy{name, matcher, callback};
172}
173
175{
176 auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
177 auto& decongestionService = ref.get<DecongestionService>();
178 decongestionService.orderedCompletionPolicyActive = true;
179 for (auto& input : inputs) {
180 if (input.header == nullptr) {
182 }
183 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
184 if (startTime == 0) {
185 LOGP(debug, "startTime is 0, which means we have the first message, so we can process it.");
186 decongestionService.nextTimeslice = 0;
187 }
188 if (framework::DataRefUtils::isValid(input) && startTime != decongestionService.nextTimeslice) {
190 }
191 }
192 decongestionService.nextTimeslice++;
194 };
195 return CompletionPolicy{name, matcher, callbackFull};
196}
197
199{
200 auto matcher = [matchName](DeviceSpec const& device) -> bool {
201 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
202 };
203 return consumeWhenAllOrdered(matcher);
204}
205
207{
208 return CompletionPolicy{
209 name,
210 matcher,
211 [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
212 size_t present = 0;
213 size_t current = 0;
214 size_t withPayload = 0;
215 size_t sporadic = 0;
216 size_t maxSporadic = 0;
217 size_t i = 0;
218 for (auto& input : inputs) {
219 auto& spec = specs[i++];
220 if (spec.lifetime == Lifetime::Sporadic) {
221 maxSporadic++;
222 }
223 if (input.header != nullptr) {
224 present++;
225 if (spec.lifetime == Lifetime::Sporadic) {
226 sporadic++;
227 }
228 }
229 if (input.payload != nullptr) {
230 withPayload++;
231 }
232 current++;
233 }
234 // * In case we have all inputs but the sporadic ones: Consume, since we do not know if the sporadic ones
235 // will ever come.
236 // * In case we have only sporadic inputs: Consume, since we do not know if we already Consumed
237 // the non sporadic ones above.
238 // * In case we do not have payloads: Wait
239 // * In all other cases we consume what is there, but we wait for the non sporadic ones to be complete
240 // (i.e. we wait for present + maxSporadic).
241 if (present - sporadic + maxSporadic == inputs.size()) {
243 } else if (present - sporadic == 0) {
245 } else if (withPayload == 0) {
247 }
249 }};
250}
251
253{
254 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
255 for (auto& input : inputs) {
256 if (input.header != nullptr) {
258 }
259 }
261 };
262 return CompletionPolicy{name, matcher, callback, false};
263}
264
266{
267 auto matcher = [matchName](DeviceSpec const& device) -> bool {
268 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
269 };
270 return consumeWhenAny(matcher);
271}
272
274{
275 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
276 bool canConsume = false;
277 bool hasConditions = false;
278 bool conditionMissing = false;
279 size_t timeslice = -1;
280 static size_t timesliceOK = -1; // FIXME: This breaks start/stop/start, since it must be reset!
281 // FIXME: Also, this just checks the max timeslice that was already consumed.
282 // In case timeslices do not come in order, we might have consumed a later
283 // condition object, but not the one for the current time slice.
284 // But I don't see any possibility to handle this in a better way.
285
286 // Iterate on all specs and all inputs simultaneously
287 for (size_t i = 0; i < inputs.size(); ++i) {
288 char const* header = inputs.header(i);
289 auto& spec = specs[i];
290 // In case a condition object is not there, we need to wait.
291 if (header != nullptr) {
292 canConsume = true;
293 }
294 if (spec.lifetime == Lifetime::Condition) {
295 hasConditions = true;
296 if (header == nullptr) {
297 conditionMissing = true;
298 }
299 }
300 }
301 if (canConsume || conditionMissing) {
302 for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) {
303 for (auto const& ref : it) {
305 continue;
306 }
307 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(ref);
308 if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
309 timeslice = dph->startTime;
310 break;
311 }
312 }
313 if (timeslice != -1) {
314 break;
315 }
316 }
317 }
318
319 // If there are no conditions, just consume.
320 if (!hasConditions) {
321 canConsume = true;
322 } else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
324 }
325
326 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
327 timesliceOK = timeslice;
328 }
330 };
331 return CompletionPolicy{name, matcher, callback, false};
332}
333
335{
336 auto matcher = [matchName](DeviceSpec const& device) -> bool {
337 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
338 };
339 return consumeWhenAnyWithAllConditions(matcher);
340}
341
343{
344 auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
345 size_t present = 0;
346 for (auto& input : inputs) {
347 if (input.header != nullptr) {
348 present++;
349 }
350 }
351 if (present == inputs.size()) {
353 } else if (present == 0) {
355 }
357 };
358 return CompletionPolicy{name, matcher, callback};
359}
360
361} // namespace o2::framework
#define O2_BUILTIN_UNREACHABLE
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
std::ostringstream debug
const char * header(size_t i) const
Definition InputSpan.h:75
const_iterator end() const
Definition InputSpan.h:231
const_iterator begin() const
Definition InputSpan.h:225
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:70
OldestInputInfo getOldestPossibleInput() const
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46