159 int rateLimitingIPCID = std::stoi(ctx.
options().
get<std::string>(
"timeframes-rate-limit-ipcid"));
161 .
name =
"internal-dpl-ccdb-backend",
164 {
"condition-not-before",
VariantType::Int64, 0ll, {
"do not fetch from CCDB objects created before provide timestamp"}},
165 {
"condition-not-after",
VariantType::Int64, 3385078236000ll, {
"do not fetch from CCDB objects created after the timestamp"}},
166 {
"condition-remap",
VariantType::String,
"", {
"remap condition path in CCDB based on the provided string."}},
169 {
"condition-use-slice-for-prescaling",
VariantType::Int, 0, {
"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {
"condition-time-tolerance",
VariantType::Int64, 5000ll, {
"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {
"orbit-offset-enumeration",
VariantType::Int64, 0ll, {
"initial value for the orbit"}},
172 {
"orbit-multiplier-enumeration",
VariantType::Int64, 0ll, {
"multiplier to get the orbit from the counter"}},
173 {
"start-value-enumeration",
VariantType::Int64, 0ll, {
"initial value for the enumeration"}},
174 {
"end-value-enumeration",
VariantType::Int64, -1ll, {
"final value for the enumeration"}},
175 {
"step-value-enumeration",
VariantType::Int64, 1ll, {
"step between one value and the other"}}},
178 .
name =
"internal-dpl-aod-ccdb",
183 {
"condition-not-before",
VariantType::Int64, 0ll, {
"do not fetch from CCDB objects created before provide timestamp"}},
184 {
"condition-not-after",
VariantType::Int64, 3385078236000ll, {
"do not fetch from CCDB objects created after the timestamp"}},
185 {
"condition-remap",
VariantType::String,
"", {
"remap condition path in CCDB based on the provided string."}},
188 {
"condition-use-slice-for-prescaling",
VariantType::Int, 0, {
"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {
"condition-time-tolerance",
VariantType::Int64, 5000ll, {
"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {
"start-value-enumeration",
VariantType::Int64, 0ll, {
"initial value for the enumeration"}},
191 {
"end-value-enumeration",
VariantType::Int64, -1ll, {
"final value for the enumeration"}},
192 {
"step-value-enumeration",
VariantType::Int64, 1ll, {
"step between one value and the other"}}}};
213 auto aodLifetime = Lifetime::Enumeration;
216 .
name =
"internal-dpl-aod-reader",
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
240 for (
size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
244 dec.outTskMap.push_back({
hash,
name});
246 std::string prefix =
"internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
255 bool hasTimeframeInputs = std::ranges::any_of(processor.inputs, [](
auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::ranges::any_of(processor.outputs, [](
auto const&
output) { return output.lifetime == Lifetime::Timeframe; });
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find(
"internal-dpl-injected-dummy-sink") == std::string::npos) {
264 bool hasMatch =
false;
266 auto summaryOutput = std::ranges::find_if(processor.outputs, [&summaryMatcher](
auto const&
output) { return DataSpecUtils::match(output, summaryMatcher); });
267 if (summaryOutput != processor.outputs.end()) {
268 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"%{public}s already there in %{public}s",
274 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"Adding DPL/SUMMARY/%d to %{public}s",
hash, processor.name.c_str());
279 bool hasConditionOption =
false;
280 for (
size_t ii = 0; ii < processor.inputs.size(); ++ii) {
281 auto& input = processor.inputs[ii];
282 bool hasProjectors =
false;
283 bool hasIndexRecords =
false;
284 bool hasCCDBURLs =
false;
286 for (
auto const& p : input.metadata) {
287 if (
p.name.compare(
"projectors") == 0) {
288 hasProjectors =
true;
291 if (
p.name.compare(
"index-records") == 0) {
292 hasIndexRecords =
true;
295 if (
p.name.starts_with(
"ccdb:")) {
300 switch (input.lifetime) {
301 case Lifetime::Timer: {
303 auto hasOption = std::ranges::any_of(processor.options, [&input](
auto const& option) { return (option.name ==
"period-" + input.binding); });
304 if (hasOption ==
false) {
305 processor.options.push_back(ConfigParamSpec{
"period-" + input.binding,
VariantType::Int, 1000, {
"period of the timer in milliseconds"}});
307 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
309 case Lifetime::Signal: {
311 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
313 case Lifetime::Enumeration: {
315 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
317 case Lifetime::Condition: {
318 requestedCCDBs.emplace_back(input);
319 if ((hasConditionOption ==
false) && std::ranges::none_of(processor.options, [](
auto const& option) { return (option.name.compare(
"condition-backend") == 0); })) {
321 processor.options.emplace_back(ConfigParamSpec{
"condition-timestamp",
VariantType::Int64, 0ll, {
"Force timestamp for CCDB lookup"}});
322 hasConditionOption =
true;
325 case Lifetime::OutOfBand: {
327 auto hasOption = std::ranges::any_of(processor.options, [&input](
auto const& option) { return (option.name ==
"out-of-band-channel-name-" + input.binding); });
328 if (hasOption ==
false) {
329 processor.options.push_back(ConfigParamSpec{
"out-of-band-channel-name-" + input.binding,
VariantType::String,
"out-of-band", {
"channel to listen for out of band data"}});
331 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
334 case Lifetime::Transient:
335 case Lifetime::Timeframe:
336 case Lifetime::Optional:
341 }
else if (hasIndexRecords) {
343 }
else if (hasCCDBURLs) {
350 std::ranges::stable_sort(timer.outputs, [](
OutputSpec const&
a,
OutputSpec const&
b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
352 for (
auto&
output : processor.outputs) {
353 bool hasProjectors =
false;
354 bool hasIndexRecords =
false;
355 bool hasCCDBURLs =
false;
357 for (
auto const& p :
output.metadata) {
358 if (
p.name.compare(
"projectors") == 0) {
359 hasProjectors =
true;
362 if (
p.name.compare(
"index-records") == 0) {
363 hasIndexRecords =
true;
366 if (
p.name.starts_with(
"ccdb:")) {
372 dec.providedDYNs.emplace_back(
output);
373 }
else if (hasCCDBURLs) {
374 dec.providedTIMs.emplace_back(
output);
375 }
else if (hasIndexRecords) {
376 dec.providedIDXs.emplace_back(
output);
378 dec.providedAODs.emplace_back(
output);
380 dec.providedOutputObjHist.emplace_back(
output);
381 auto it = std::ranges::find_if(dec.outObjHistMap, [&](
auto&&
x) { return x.id == hash; });
382 if (it == dec.outObjHistMap.end()) {
383 dec.outObjHistMap.push_back({
hash, {
output.binding.value}});
385 it->bindings.push_back(
output.binding.value);
389 if (
output.lifetime == Lifetime::Condition) {
390 providedCCDBs.push_back(
output);
399 "internal-dpl-aod-index-builder",
404 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
405 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
406 dec.requestedIDXs | views::filter_not_matching(dec.providedIDXs) | sinks::append_to{dec.builderInputs};
409 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
410 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
411 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
414 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
415 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
416 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
419 "internal-dpl-aod-spawner",
426 std::ranges::sort(dec.requestedAODs, inputSpecLessThan);
427 std::ranges::sort(dec.providedAODs, outputSpecLessThan);
430 std::ranges::sort(requestedCCDBs, inputSpecLessThan);
431 std::ranges::sort(providedCCDBs, outputSpecLessThan);
434 std::vector<DataProcessorSpec> extraSpecs;
436 if (transientStore.outputs.empty() ==
false) {
437 extraSpecs.push_back(transientStore);
439 if (qaStore.outputs.empty() ==
false) {
440 extraSpecs.push_back(qaStore);
443 if (aodSpawner.outputs.empty() ==
false) {
447 if (indexBuilder.outputs.empty() ==
false) {
448 extraSpecs.push_back(indexBuilder);
453 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
454 if (analysisCCDBBackend.outputs.empty() ==
false) {
455 extraSpecs.push_back(analysisCCDBBackend);
459 auto tfnsource = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
461 return DataSpecUtils::match(output,
"TFN",
"TFNumber", 0);
466 if (aodReader.outputs.empty() ==
false) {
467 if (tfnsource == workflow.end()) {
469 aodReader.outputs.emplace_back(
OutputSpec{
"TFN",
"TFNumber"});
470 aodReader.outputs.emplace_back(
OutputSpec{
"TFF",
"TFFilename"});
473 aodReader.algorithm = AlgorithmSpec{
475 [](DeviceSpec
const& spec) {
476 LOGP(warn,
"Workflow with injected AODs has unsatisfied inputs:");
477 for (
auto const&
output : spec.outputs) {
480 LOGP(fatal,
"Stopping.");
486 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
491 auto& dstf = std::get<ConcreteDataMatcher>(
matcher.matcher);
494 bool providesDISTSTF = std::ranges::any_of(workflow,
495 [&matcher](
auto const& dp) {
496 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](
auto const&
output) {
497 return DataSpecUtils::match(matcher, output);
504 bool requiresDISTSUBTIMEFRAME = std::ranges::any_of(workflow,
505 [&dstf](
auto const& dp) {
506 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](
auto const& input) {
507 return DataSpecUtils::match(input, dstf);
515 int enumCandidate = -1;
516 int timerCandidate = -1;
517 for (
auto wi = 0U; wi < workflow.size(); ++wi) {
518 auto& dp = workflow[wi];
519 if (dp.inputs.size() != 1) {
522 auto lifetime = dp.inputs[0].lifetime;
523 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].
name > dp.name)) {
526 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].
name > dp.name)) {
537 if (ccdbBackend.outputs.empty() ==
false) {
538 if (aodReader.outputs.empty() ==
false) {
540 ccdbBackend.inputs.push_back(
InputSpec{
"tfn",
"TFN",
"TFNumber"});
541 }
else if (providesDISTSTF) {
543 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", dstf, Lifetime::Timeframe});
545 if (enumCandidate != -1) {
549 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", dstf, Lifetime::Timeframe});
550 }
else if (timerCandidate != -1) {
553 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", timer_dstf, Lifetime::Timeframe});
557 ccdbBackend.outputs.push_back(
OutputSpec{
"CTP",
"OrbitReset", 0});
560 extraSpecs.push_back(
timePipeline(ccdbBackend, ctx.options().get<
int64_t>(
"ccdb-fetchers")));
561 }
else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
567 if (timer.outputs.empty() ==
false) {
568 extraSpecs.push_back(timer);
573 if (dec.providedOutputObjHist.empty() ==
false) {
575 extraSpecs.push_back(rootSink);
578 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
581 injectAODWriter(workflow, ctx);
584 std::vector<InputSpec> redirectedOutputsInputs;
585 for (
auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
586 if (ctx.options().get<std::string>(
"forwarding-policy") ==
"none") {
591 if (!dec.isDangling[ii] && (ctx.options().get<std::string>(
"forwarding-policy") !=
"all")) {
598 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
601 std::vector<InputSpec> unmatched;
602 auto forwardingDestination = ctx.options().get<std::string>(
"forwarding-destination");
606 return DataSpecUtils::match(output,
"TFN",
"TFNumber", 0);
609 if (redirectedOutputsInputs.size() > 0 && forwardingDestination ==
"file") {
611 if (unmatched.size() != redirectedOutputsInputs.size()) {
612 extraSpecs.push_back(fileSink);
614 }
else if (redirectedOutputsInputs.size() > 0 && forwardingDestination ==
"fairmq") {
616 extraSpecs.push_back(fairMQSink);
617 }
else if (forwardingDestination !=
"drop") {
618 throw runtime_error_f(
"Unknown forwarding destination %s", forwardingDestination.c_str());
620 if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
621 std::vector<InputSpec> ignored = unmatched;
622 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
623 for (
auto& ignoredInput : ignored) {
624 ignoredInput.lifetime = Lifetime::Sporadic;
629 if (tfnsource != workflow.end()) {
634 if (tfnsource != workflow.end() && !tfnsource->name.starts_with(
"aod-producer-workflow")) {
639 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"injectServiceDevices",
"Injecting rate limited dummy sink");
640 std::string rateLimitingChannelConfigOutput;
641 if (rateLimitingIPCID != -1) {
642 rateLimitingChannelConfigOutput = fmt::format(
"name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
648 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
771 std::vector<DeviceConnectionEdge>& logicalEdges,
772 std::vector<OutputSpec>& outputs,
773 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
776 if (workflow.empty()) {
781 std::vector<LogicalOutputInfo> availableOutputsInfo;
782 auto const& constOutputs = outputs;
784 std::vector<LogicalOutputInfo> forwards;
788 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
790 for (
size_t wi = 0; wi < workflow.size(); ++wi) {
791 auto& producer = workflow[wi];
792 if (producer.outputs.empty()) {
793 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"No outputs for [%zu] %{public}s", wi, producer.name.c_str());
795 O2_SIGNPOST_START(workflow_helpers, sid,
"output enumeration",
"Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
797 for (
size_t oi = 0; oi < producer.outputs.size(); ++oi) {
798 auto& out = producer.outputs[oi];
799 auto uniqueOutputId = outputs.size();
803 outputs.push_back(out);
809 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](
size_t ci,
size_t ii) {
810 auto input = workflow[ci].inputs[ii];
811 std::ostringstream
str;
812 str <<
"No matching output found for "
813 <<
DataSpecUtils::describe(input) <<
" as requested by data processor \"" << workflow[ci].name <<
"\". Candidates:\n";
815 for (
auto&
output : constOutputs) {
819 throw std::runtime_error(
str.str());
833 enumerateAvailableOutputs();
835 std::vector<bool> matches(constOutputs.size());
836 for (
size_t consumer = 0; consumer < workflow.size(); ++consumer) {
838 O2_SIGNPOST_START(workflow_helpers, sid,
"input matching",
"Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].
name.c_str());
839 for (
size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
841 for (
size_t i = 0;
i < constOutputs.size();
i++) {
850 for (
size_t i = 0;
i < availableOutputsInfo.size();
i++) {
852 if (!matches[availableOutputsInfo[
i].outputGlobalIndex]) {
855 auto* oif = &availableOutputsInfo[
i];
859 auto producer = oif->specIndex;
860 auto uniqueOutputId = oif->outputGlobalIndex;
861 for (
size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
862 for (
size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
863 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output",
"Adding edge between %{public}s and %{public}s", workflow[consumer].
name.c_str(),
864 workflow[producer].name.c_str());
865 logicalEdges.emplace_back(
DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
870 oif->enabled =
false;
872 if (forwards.empty()) {
873 errorDueToMissingOutputFor(consumer, input);
875 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](
auto const& info) { return info.enabled == false; }), availableOutputsInfo.end());
876 std::ranges::copy(forwards, std::back_inserter(availableOutputsInfo));