23#include <fairmq/shmem/Message.h>
36 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
39 auto originReceived = std::make_shared<std::vector<uint64_t>>();
43 for (
auto&
ref : inputRefs) {
44 if (
ref.header !=
nullptr) {
45 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
46 if (header->dataOrigin.str ==
origin) {
47 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
48 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
49 if (it == originReceived->end()) {
50 originReceived->emplace_back(startTime);
58 for (
auto&
ref : inputRefs) {
59 if (
ref.header !=
nullptr) {
60 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
61 if (header->dataOrigin.str !=
origin) {
62 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
63 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
64 if (it == originReceived->end()) {
65 LOGP(info,
"Have to wait until message of origin {} with startTime {} has arrived.",
origin, startTime);
81 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
115 assert(inputs.
size() == specs.size());
117 O2_SIGNPOST_START(completion, sid,
"consumeWhenAll",
"Completion policy invoked");
120 int sporadicCount = 0;
121 int timeframeCount = 0;
122 int missingSporadicCount = 0;
123 bool needsProcessing =
false;
124 size_t currentTimeslice = -1;
125 for (
auto& input : inputs) {
126 assert(si < specs.size());
127 auto& spec = specs[si++];
128 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
129 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
131 if (input.header ==
nullptr && spec.lifetime != Lifetime::Sporadic) {
132 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s due to missing input %lu",
"Wait", si);
136 if (input.header ==
nullptr && spec.lifetime == Lifetime::Sporadic) {
137 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"Missing sporadic found for route index %lu", si);
138 missingSporadicCount += 1;
142 if (input.header !=
nullptr && currentTimeslice == -1) {
143 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
145 currentTimeslice = dph->startTime;
146 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"currentTimeslice %lu from route index %lu", currentTimeslice, si);
150 if (input.header !=
nullptr && spec.lifetime != Lifetime::Condition) {
151 needsProcessing =
true;
159 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
160 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu",
"Retry", currentTimeslice, oldestPossibleTimeslice);
165 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
166 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu",
"Discard", currentTimeslice);
169 auto consumes = (needsProcessing || sporadicCount == 0);
170 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu", consumes ?
"Consume" :
"Discard", currentTimeslice);
181 for (
auto& input : inputs) {
182 if (input.header ==
nullptr) {
185 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
186 if (startTime == 0) {
187 LOGP(
debug,
"startTime is 0, which means we have the first message, so we can process it.");
188 decongestionService.nextTimeslice = 0;
194 decongestionService.nextTimeslice++;
202 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
203 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
216 size_t withPayload = 0;
218 size_t maxSporadic = 0;
220 for (
auto& input : inputs) {
221 auto& spec = specs[
i++];
222 if (spec.lifetime == Lifetime::Sporadic) {
225 if (input.header !=
nullptr) {
227 if (spec.lifetime == Lifetime::Sporadic) {
231 if (input.payload !=
nullptr) {
243 if (present - sporadic + maxSporadic == inputs.size()) {
245 }
else if (present - sporadic == 0) {
247 }
else if (withPayload == 0) {
257 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
270 size_t currentTimeslice = -1;
271 for (
auto& input : inputs) {
272 if (input.header ==
nullptr) {
285 if (currentTimeslice >= oldestPossibleTimeslice) {
296 for (
auto& input : inputs) {
297 if (input.header !=
nullptr) {
308 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
309 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
317 bool canConsume =
false;
318 bool hasConditions =
false;
319 bool conditionMissing =
false;
320 size_t timeslice = -1;
321 static size_t timesliceOK = -1;
328 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
329 char const* header = inputs.
header(
i);
330 auto& spec = specs[
i];
332 if (header !=
nullptr) {
335 if (spec.lifetime == Lifetime::Condition) {
336 hasConditions =
true;
337 if (header ==
nullptr) {
338 conditionMissing =
true;
342 if (canConsume || conditionMissing) {
343 for (
auto it = inputs.
begin(),
end = inputs.
end(); it !=
end; ++it) {
344 for (
auto const&
ref : it) {
348 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(
ref);
350 timeslice = dph->startTime;
354 if (timeslice != -1) {
361 if (!hasConditions) {
363 }
else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
367 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
368 timesliceOK = timeslice;
377 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
378 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
387 for (
auto& input : inputs) {
388 if (input.header !=
nullptr) {
392 if (present == inputs.
size()) {
394 }
else if (present == 0) {
header::DataOrigin origin
#define O2_BUILTIN_UNREACHABLE
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
OldestInputInfo getOldestPossibleInput() const
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy consumeWhenPastOldestPossibleTimeframe(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy consumeWhenAnyZeroCount(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts which has arrived has a refcount of 1.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)