22#if __has_include(<fairmq/shmem/Message.h>)
23#include <fairmq/shmem/Message.h>
37 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
40 auto originReceived = std::make_shared<std::vector<uint64_t>>();
44 for (
auto&
ref : inputRefs) {
45 if (
ref.header !=
nullptr) {
46 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
47 if (header->dataOrigin.str == origin) {
48 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
49 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
50 if (it == originReceived->end()) {
51 originReceived->emplace_back(startTime);
59 for (
auto&
ref : inputRefs) {
60 if (
ref.header !=
nullptr) {
61 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
62 if (header->dataOrigin.str != origin) {
63 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
64 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
65 if (it == originReceived->end()) {
66 LOGP(info,
"Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
82 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
116 assert(inputs.
size() == specs.size());
118 O2_SIGNPOST_START(completion, sid,
"consumeWhenAll",
"Completion policy invoked");
121 int sporadicCount = 0;
122 int timeframeCount = 0;
123 int missingSporadicCount = 0;
124 bool needsProcessing =
false;
125 size_t currentTimeslice = -1;
126 for (
auto& input : inputs) {
127 assert(si < specs.size());
128 auto& spec = specs[si++];
129 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
130 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
132 if (input.header ==
nullptr && spec.lifetime != Lifetime::Sporadic) {
133 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s due to missing input %lu",
"Wait", si);
137 if (input.header ==
nullptr && spec.lifetime == Lifetime::Sporadic) {
138 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"Missing sporadic found for route index %lu", si);
139 missingSporadicCount += 1;
143 if (input.header !=
nullptr && currentTimeslice == -1) {
144 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
146 currentTimeslice = dph->startTime;
147 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"currentTimeslice %lu from route index %lu", currentTimeslice, si);
151 if (input.header !=
nullptr && spec.lifetime != Lifetime::Condition) {
152 needsProcessing =
true;
160 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
161 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu",
"Retry", currentTimeslice, oldestPossibleTimeslice);
166 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
167 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu",
"Discard", currentTimeslice);
170 auto consumes = (needsProcessing || sporadicCount == 0);
171 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu", consumes ?
"Consume" :
"Discard", currentTimeslice);
182 for (
auto& input : inputs) {
183 if (input.header ==
nullptr) {
186 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
187 if (startTime == 0) {
188 LOGP(
debug,
"startTime is 0, which means we have the first message, so we can process it.");
189 decongestionService.nextTimeslice = 0;
195 decongestionService.nextTimeslice++;
203 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
204 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
217 size_t withPayload = 0;
219 size_t maxSporadic = 0;
221 for (
auto& input : inputs) {
222 auto& spec = specs[
i++];
223 if (spec.lifetime == Lifetime::Sporadic) {
226 if (input.header !=
nullptr) {
228 if (spec.lifetime == Lifetime::Sporadic) {
232 if (input.payload !=
nullptr) {
244 if (present - sporadic + maxSporadic == inputs.size()) {
246 }
else if (present - sporadic == 0) {
248 }
else if (withPayload == 0) {
255#if __has_include(<fairmq/shmem/Message.h>)
259 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
266 return CompletionPolicy{
name, matcher, callback,
false};
273 for (
auto& input : inputs) {
274 if (input.header !=
nullptr) {
285 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
286 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
294 bool canConsume =
false;
295 bool hasConditions =
false;
296 bool conditionMissing =
false;
297 size_t timeslice = -1;
298 static size_t timesliceOK = -1;
305 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
306 char const* header = inputs.
header(
i);
307 auto& spec = specs[
i];
309 if (header !=
nullptr) {
312 if (spec.lifetime == Lifetime::Condition) {
313 hasConditions =
true;
314 if (header ==
nullptr) {
315 conditionMissing =
true;
319 if (canConsume || conditionMissing) {
320 for (
auto it = inputs.
begin(),
end = inputs.
end(); it !=
end; ++it) {
321 for (
auto const&
ref : it) {
325 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(
ref);
327 timeslice = dph->startTime;
331 if (timeslice != -1) {
338 if (!hasConditions) {
340 }
else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
344 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
345 timesliceOK = timeslice;
354 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
355 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
364 for (
auto& input : inputs) {
365 if (input.header !=
nullptr) {
369 if (present == inputs.
size()) {
371 }
else if (present == 0) {
#define O2_BUILTIN_UNREACHABLE
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
OldestInputInfo getOldestPossibleInput() const
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)