34 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
37 auto originReceived = std::make_shared<std::vector<uint64_t>>();
41 for (
auto&
ref : inputRefs) {
42 if (
ref.header !=
nullptr) {
43 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
44 if (header->dataOrigin.str == origin) {
45 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
46 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
47 if (it == originReceived->end()) {
48 originReceived->emplace_back(startTime);
56 for (
auto&
ref : inputRefs) {
57 if (
ref.header !=
nullptr) {
58 auto header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(
ref);
59 if (header->dataOrigin.str != origin) {
60 auto startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
61 auto it = std::find(originReceived->begin(), originReceived->end(), startTime);
62 if (it == originReceived->end()) {
63 LOGP(info,
"Have to wait until message of origin {} with startTime {} has arrived.", origin, startTime);
79 return std::regex_match(device.name.begin(), device.name.end(), std::regex(
name));
113 assert(inputs.
size() == specs.size());
115 O2_SIGNPOST_START(completion, sid,
"consumeWhenAll",
"Completion policy invoked");
118 int sporadicCount = 0;
119 int timeframeCount = 0;
120 int missingSporadicCount = 0;
121 bool needsProcessing =
false;
122 size_t currentTimeslice = -1;
123 for (
auto& input : inputs) {
124 assert(si < specs.size());
125 auto& spec = specs[si++];
126 sporadicCount += spec.lifetime == Lifetime::Sporadic ? 1 : 0;
127 timeframeCount += spec.lifetime == Lifetime::Timeframe ? 1 : 0;
129 if (input.header ==
nullptr && spec.lifetime != Lifetime::Sporadic) {
130 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s due to missing input %lu",
"Wait", si);
134 if (input.header ==
nullptr && spec.lifetime == Lifetime::Sporadic) {
135 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"Missing sporadic found for route index %lu", si);
136 missingSporadicCount += 1;
140 if (input.header !=
nullptr && currentTimeslice == -1) {
141 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
143 currentTimeslice = dph->startTime;
144 O2_SIGNPOST_EVENT_EMIT(completion, sid,
"consumeWhenAll",
"currentTimeslice %lu from route index %lu", currentTimeslice, si);
148 if (input.header !=
nullptr && spec.lifetime != Lifetime::Condition) {
149 needsProcessing =
true;
157 if (missingSporadicCount && currentTimeslice >= oldestPossibleTimeslice) {
158 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu",
"Retry", currentTimeslice, oldestPossibleTimeslice);
163 if (needsProcessing && (sporadicCount > 0) && (missingSporadicCount == sporadicCount) && (timeframeCount == 0)) {
164 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu",
"Discard", currentTimeslice);
167 auto consumes = (needsProcessing || sporadicCount == 0);
168 O2_SIGNPOST_END(completion, sid,
"consumeWhenAll",
"Completion policy returned %{public}s for timeslice %lu", consumes ?
"Consume" :
"Discard", currentTimeslice);
179 for (
auto& input : inputs) {
180 if (input.header ==
nullptr) {
183 long int startTime = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input)->startTime;
184 if (startTime == 0) {
185 LOGP(
debug,
"startTime is 0, which means we have the first message, so we can process it.");
186 decongestionService.nextTimeslice = 0;
192 decongestionService.nextTimeslice++;
200 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
201 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
214 size_t withPayload = 0;
216 size_t maxSporadic = 0;
218 for (
auto& input : inputs) {
219 auto& spec = specs[
i++];
220 if (spec.lifetime == Lifetime::Sporadic) {
223 if (input.header !=
nullptr) {
225 if (spec.lifetime == Lifetime::Sporadic) {
229 if (input.payload !=
nullptr) {
241 if (present - sporadic + maxSporadic == inputs.size()) {
243 }
else if (present - sporadic == 0) {
245 }
else if (withPayload == 0) {
255 for (
auto& input : inputs) {
256 if (input.header !=
nullptr) {
267 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
268 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
276 bool canConsume =
false;
277 bool hasConditions =
false;
278 bool conditionMissing =
false;
279 size_t timeslice = -1;
280 static size_t timesliceOK = -1;
287 for (
size_t i = 0;
i < inputs.
size(); ++
i) {
288 char const* header = inputs.
header(
i);
289 auto& spec = specs[
i];
291 if (header !=
nullptr) {
294 if (spec.lifetime == Lifetime::Condition) {
295 hasConditions =
true;
296 if (header ==
nullptr) {
297 conditionMissing =
true;
301 if (canConsume || conditionMissing) {
302 for (
auto it = inputs.
begin(),
end = inputs.
end(); it !=
end; ++it) {
303 for (
auto const&
ref : it) {
307 auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(
ref);
309 timeslice = dph->startTime;
313 if (timeslice != -1) {
320 if (!hasConditions) {
322 }
else if (conditionMissing && (timeslice == -1 || timesliceOK == -1 || timeslice > timesliceOK)) {
326 if (canConsume && timeslice != -1 && (timeslice > timesliceOK || timesliceOK == -1)) {
327 timesliceOK = timeslice;
336 auto matcher = [matchName](
DeviceSpec const& device) ->
bool {
337 return std::regex_match(device.name.begin(), device.name.end(), std::regex(matchName));
346 for (
auto& input : inputs) {
347 if (input.header !=
nullptr) {
351 if (present == inputs.
size()) {
353 }
else if (present == 0) {
#define O2_BUILTIN_UNREACHABLE
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
OldestInputInfo getOldestPossibleInput() const
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeExistingWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
static CompletionPolicy defineByNameOrigin(std::string const &name, std::string const &origin, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
std::function< bool(DeviceSpec const &device)> Matcher
static bool isValid(DataRef const &ref)
bool orderedCompletionPolicyActive
Ordered completion policy is active.
static bool timesliceIsTimer(size_t timeslice)