22#include <fairmq/shmem/Message.h>
35 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
38 auto originReceived = std::make_shared<std::vector<uint64_t>>();
42 for (
auto&
ref : inputRefs) {
43 if (
ref.header !=
nullptr) {
44 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
45 if (header->dataOrigin.str ==
origin) {
46 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
47 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
48 if (it == originReceived->end()) {
49 originReceived->emplace_back(startTime);
57 for (
auto&
ref : inputRefs) {
58 if (
ref.header !=
nullptr) {
59 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
60 if (header->dataOrigin.str !=
origin) {
61 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
62 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
63 if (it == originReceived->end()) {
64 LOGP(info,
"Have to wait until message of origin {} with startTime {} has arrived.",
origin, startTime);
80 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
114 assert(inputs.
size() == specs.size());
116 O2_SIGNPOST_START(completion, sid,
"consumeWhenAll",
"Completion policy invoked");
119 int sporadicCount = 0;
120 int timeframeCount = 0;
121 int missingSporadicCount = 0;
122 bool needsProcessing =
false;
123 size_t currentTimeslice = -1;
124 for (
auto& input : inputs) {
125 assert(si < specs.size());
126 auto& spec = specs[si++];
127 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
128 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
130 if (input.header ==
nullptr && spec.lifetime != Lifetime::Sporadic) {
131 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s due to missing input %lu",
"Wait", si);
135 if (input.header ==
nullptr && spec.lifetime == Lifetime::Sporadic) {
136 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"Missing sporadic found for route index %lu", si);
137 missingSporadicCount += 1;
141 if (input.header !=
nullptr && currentTimeslice == -1) {
142 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
144 currentTimeslice = dph->startTime;
145 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"currentTimeslice %lu from route index %lu", currentTimeslice, si);
149 if (input.header !=
nullptr && spec.lifetime != Lifetime::Condition) {
150 needsProcessing =
true;
158 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
159 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu",
"Retry", currentTimeslice, oldestPossibleTimeslice);
164 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
165 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu",
"Discard", currentTimeslice);
168 auto consumes = (needsProcessing || sporadicCount == 0);
169 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu", consumes ?
"Consume" :
"Discard", currentTimeslice);
180 for (
auto& input : inputs) {
181 if (input.header ==
nullptr) {
184 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
185 if (startTime == 0) {
186 LOGP(
debug,
"startTime is 0, which means we have the first message, so we can process it.");
187 decongestionService.nextTimeslice = 0;
193 decongestionService.nextTimeslice++;
201 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
202 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
215 size_t withPayload = 0;
217 size_t maxSporadic = 0;
219 for (
auto& input : inputs) {
220 auto& spec = specs[
i++];
221 if (spec.lifetime == Lifetime::Sporadic) {
224 if (input.header !=
nullptr) {
226 if (spec.lifetime == Lifetime::Sporadic) {
230 if (input.payload !=
nullptr) {
242 if (present - sporadic + maxSporadic == inputs.size()) {
244 }
else if (present - sporadic == 0) {
246 }
else if (withPayload == 0) {
256 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
269 for (
auto& input : inputs) {
270 if (input.header !=
nullptr) {
281 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
282 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
290 bool canConsume =
false;
291 bool hasConditions =
false;
292 bool conditionMissing =
false;
293 size_t timeslice = -1;
294 static size_t timesliceOK = -1;
301 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
302 char const* header = inputs.
header(
i);
303 auto& spec = specs[
i];
305 if (header !=
nullptr) {
308 if (spec.lifetime == Lifetime::Condition) {
309 hasConditions =
true;
310 if (header ==
nullptr) {
311 conditionMissing =
true;
315 if (canConsume || conditionMissing) {
316 for (
auto it = inputs.
begin(),
end = inputs.
end(); it !=
end; ++it) {
317 for (
auto const&
ref : it) {
321 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(
ref);
323 timeslice = dph->startTime;
327 if (timeslice != -1) {
334 if (!hasConditions) {
336 }
else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
340 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
341 timesliceOK = timeslice;
350 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
351 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
360 for (
auto& input : inputs) {
361 if (input.header !=
nullptr) {
365 if (present == inputs.
size()) {
367 }
else if (present == 0) {
header::DataOrigin origin
#define O2_BUILTIN_UNREACHABLE
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
OldestInputInfo getOldestPossibleInput() const
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy consumeWhenAnyZeroCount(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts which has arrived has a refcount of 1.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)