162 LOG(info) <<
"This is not a real device, merely a placeholder for external inputs";
163 LOG(info) <<
"To be hidden / removed at some point.";
174 .
name =
"internal-dpl-ccdb-backend",
177 {
"condition-not-before",
VariantType::Int64, 0ll, {
"do not fetch from CCDB objects created before provide timestamp"}},
178 {
"condition-not-after",
VariantType::Int64, 3385078236000ll, {
"do not fetch from CCDB objects created after the timestamp"}},
179 {
"condition-remap",
VariantType::String,
"", {
"remap condition path in CCDB based on the provided string."}},
182 {
"condition-use-slice-for-prescaling",
VariantType::Int, 0, {
"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
183 {
"condition-time-tolerance",
VariantType::Int64, 5000ll, {
"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
184 {
"orbit-offset-enumeration",
VariantType::Int64, 0ll, {
"initial value for the orbit"}},
185 {
"orbit-multiplier-enumeration",
VariantType::Int64, 0ll, {
"multiplier to get the orbit from the counter"}},
186 {
"start-value-enumeration",
VariantType::Int64, 0ll, {
"initial value for the enumeration"}},
187 {
"end-value-enumeration",
VariantType::Int64, -1ll, {
"final value for the enumeration"}},
188 {
"step-value-enumeration",
VariantType::Int64, 1ll, {
"step between one value and the other"}}},
191 .
name =
"internal-dpl-aod-ccdb",
196 {
"condition-not-before",
VariantType::Int64, 0ll, {
"do not fetch from CCDB objects created before provide timestamp"}},
197 {
"condition-not-after",
VariantType::Int64, 3385078236000ll, {
"do not fetch from CCDB objects created after the timestamp"}},
198 {
"condition-remap",
VariantType::String,
"", {
"remap condition path in CCDB based on the provided string."}},
201 {
"condition-use-slice-for-prescaling",
VariantType::Int, 0, {
"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
202 {
"condition-time-tolerance",
VariantType::Int64, 5000ll, {
"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
203 {
"start-value-enumeration",
VariantType::Int64, 0ll, {
"initial value for the enumeration"}},
204 {
"end-value-enumeration",
VariantType::Int64, -1ll, {
"final value for the enumeration"}},
205 {
"step-value-enumeration",
VariantType::Int64, 1ll, {
"step between one value and the other"}}}};
226 auto aodLifetime = Lifetime::Enumeration;
229 .
name =
"internal-dpl-aod-reader",
248 int rateLimitingIPCID = std::stoi(ctx.
options().
get<std::string>(
"timeframes-rate-limit-ipcid"));
249 std::string rateLimitingChannelConfigInput;
250 std::string rateLimitingChannelConfigOutput;
251 bool internalRateLimiting =
false;
255 if (rateLimitingIPCID >= 0) {
256 rateLimitingChannelConfigInput = fmt::format(
"name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
258 rateLimitingChannelConfigOutput = fmt::format(
"name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
260 internalRateLimiting =
true;
261 aodReader.options.emplace_back(
ConfigParamSpec{
"channel-config",
VariantType::String, rateLimitingChannelConfigInput, {
"how many timeframes can be in flight at the same time"}});
267 std::vector<InputSpec> requestedCCDBs;
268 std::vector<OutputSpec> providedCCDBs;
270 for (
size_t wi = 0; wi < workflow.size(); ++wi) {
271 auto& processor = workflow[wi];
272 auto name = processor.name;
274 ac.outTskMap.push_back({
hash,
name});
276 std::string prefix =
"internal-dpl-";
277 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
285 bool hasTimeframeInputs =
false;
286 for (
auto& input : processor.inputs) {
287 if (input.lifetime == Lifetime::Timeframe) {
288 hasTimeframeInputs =
true;
292 bool hasTimeframeOutputs =
false;
293 for (
auto&
output : processor.outputs) {
294 if (
output.lifetime == Lifetime::Timeframe) {
295 hasTimeframeOutputs =
true;
301 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
302 if (std::stoi(ctx.options().get<std::string>(
"timeframes-rate-limit-ipcid")) != -1) {
303 if (timeframeSink && processor.name.find(
"internal-dpl-injected-dummy-sink") == std::string::npos) {
306 bool hasMatch =
false;
308 for (
auto&
output : processor.outputs) {
310 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"%{public}s already there in %{public}s",
317 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"Adding DPL/SUMMARY/%d to %{public}s",
hash, processor.name.c_str());
322 bool hasConditionOption =
false;
323 for (
size_t ii = 0; ii < processor.inputs.size(); ++ii) {
324 auto& input = processor.inputs[ii];
325 switch (input.lifetime) {
326 case Lifetime::Timer: {
328 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](
auto const& option) { return (option.name ==
"period-" + input.binding); });
329 if (hasOption ==
false) {
330 processor.options.push_back(ConfigParamSpec{
"period-" + input.binding,
VariantType::Int, 1000, {
"period of the timer in milliseconds"}});
332 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
334 case Lifetime::Signal: {
336 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
338 case Lifetime::Enumeration: {
340 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
342 case Lifetime::Condition: {
343 for (
auto& option : processor.options) {
344 if (option.name ==
"condition-backend") {
345 hasConditionOption =
true;
349 if (hasConditionOption ==
false) {
351 processor.options.emplace_back(ConfigParamSpec{
"condition-timestamp",
VariantType::Int64, 0ll, {
"Force timestamp for CCDB lookup"}});
352 hasConditionOption =
true;
354 requestedCCDBs.emplace_back(input);
356 case Lifetime::OutOfBand: {
358 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](
auto const& option) { return (option.name ==
"out-of-band-channel-name-" + input.binding); });
359 if (hasOption ==
false) {
360 processor.options.push_back(ConfigParamSpec{
"out-of-band-channel-name-" + input.binding,
VariantType::String,
"out-of-band", {
"channel to listen for out of band data"}});
362 timer.outputs.emplace_back(
OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
365 case Lifetime::Transient:
366 case Lifetime::Timeframe:
367 case Lifetime::Optional:
384 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](
OutputSpec const&
a,
OutputSpec const&
b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
386 for (
auto&
output : processor.outputs) {
388 ac.providedAODs.emplace_back(
output);
390 ac.providedDYNs.emplace_back(
output);
392 ac.providedTIMs.emplace_back(
output);
394 ac.providedOutputObjHist.emplace_back(
output);
395 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](
auto&&
x) { return x.id == hash; });
396 if (it == ac.outObjHistMap.end()) {
397 ac.outObjHistMap.push_back({
hash, {
output.binding.value}});
399 it->bindings.push_back(
output.binding.value);
402 if (
output.lifetime == Lifetime::Condition) {
403 providedCCDBs.push_back(
output);
410 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
411 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
412 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
413 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
416 "internal-dpl-aod-index-builder",
423 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
425 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
429 for (
auto& input : ac.requestedDYNs) {
430 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
431 ac.spawnerInputs.emplace_back(input);
436 "internal-dpl-aod-spawner",
446 std::vector<DataProcessorSpec> extraSpecs;
448 if (transientStore.outputs.empty() ==
false) {
449 extraSpecs.push_back(transientStore);
451 if (qaStore.outputs.empty() ==
false) {
452 extraSpecs.push_back(qaStore);
455 if (aodSpawner.outputs.empty() ==
false) {
456 extraSpecs.push_back(
timePipeline(aodSpawner, ctx.options().get<int64_t>(
"spawners")));
459 if (indexBuilder.outputs.empty() ==
false) {
460 extraSpecs.push_back(indexBuilder);
464 if (aodReader.outputs.empty() ==
false) {
465 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](
auto const&
x) { return x.name ==
"mctracks-to-aod"; });
466 if (mctracks2aod == workflow.end()) {
470 aodReader.outputs.emplace_back(
OutputSpec{
"TFN",
"TFNumber"});
471 aodReader.outputs.emplace_back(
OutputSpec{
"TFF",
"TFFilename"});
474 auto algo = AlgorithmSpec{
476 [outputs = aodReader.outputs](DeviceSpec
const&) {
477 LOGP(warn,
"Workflow with injected AODs has unsatisfied inputs:");
478 for (auto const& output : outputs) {
479 LOGP(warn,
" {}", DataSpecUtils::describe(output));
481 LOGP(fatal,
"Stopping.");
488 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
489 extraSpecs.push_back(
timePipeline(aodReader, ctx.options().get<int64_t>(
"readers")));
493 if (ccdbBackend.outputs.empty() ==
false) {
494 ccdbBackend.outputs.push_back(
OutputSpec{
"CTP",
"OrbitReset", 0});
495 InputSpec matcher{
"dstf",
"FLP",
"DISTSUBTIMEFRAME", 0xccdb};
496 bool providesDISTSTF =
false;
499 for (
auto& dp : workflow) {
500 for (
auto&
output : dp.outputs) {
502 providesDISTSTF =
true;
507 if (providesDISTSTF) {
517 if (aodReader.outputs.empty() ==
false) {
518 ccdbBackend.inputs.push_back(
InputSpec{
"tfn",
"TFN",
"TFNumber"});
519 }
else if (providesDISTSTF) {
520 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", dstf, Lifetime::Timeframe});
526 int enumCandidate = -1;
527 int timerCandidate = -1;
528 for (
size_t wi = 0; wi < workflow.size(); wi++) {
529 auto& dp = workflow[wi];
530 if (dp.inputs.size() != 1) {
533 auto lifetime = dp.inputs[0].lifetime;
534 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].
name > dp.name)) {
537 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].
name > dp.name)) {
541 if (enumCandidate != -1) {
542 auto& dp = workflow[enumCandidate];
544 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", dstf, Lifetime::Timeframe});
545 }
else if (timerCandidate != -1) {
546 auto& dp = workflow[timerCandidate];
548 ccdbBackend.inputs.push_back(
InputSpec{{
"tfn"}, dstf, Lifetime::Timeframe});
554 extraSpecs.push_back(ccdbBackend);
559 bool requiresDISTSUBTIMEFRAME =
false;
560 for (
auto& dp : workflow) {
561 for (
auto& input : dp.inputs) {
563 requiresDISTSUBTIMEFRAME =
true;
568 if (requiresDISTSUBTIMEFRAME) {
573 int enumCandidate = -1;
574 int timerCandidate = -1;
575 for (
size_t wi = 0; wi < workflow.size(); wi++) {
576 auto& dp = workflow[wi];
577 if (dp.inputs.size() != 1) {
580 auto lifetime = dp.inputs[0].lifetime;
581 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].
name > dp.name)) {
584 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].
name > dp.name)) {
588 if (enumCandidate != -1) {
589 auto& dp = workflow[enumCandidate];
591 ccdbBackend.inputs.push_back(
InputSpec{
"tfn", dstf, Lifetime::Timeframe});
592 }
else if (timerCandidate != -1) {
593 auto& dp = workflow[timerCandidate];
595 ccdbBackend.inputs.push_back(
InputSpec{{
"tfn"}, dstf, Lifetime::Timeframe});
602 if (analysisCCDBBackend.outputs.empty() ==
false) {
605 analysisCCDBBackend.algorithm = algo;
606 extraSpecs.push_back(analysisCCDBBackend);
610 if (timer.outputs.empty() ==
false) {
611 extraSpecs.push_back(timer);
616 if (ac.providedOutputObjHist.empty() ==
false) {
618 extraSpecs.push_back(rootSink);
621 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
625 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
626 ac.isDangling = isDanglingTmp;
627 ac.outputsInputs = outputsInputsTmp;
635 for (
auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
637 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
638 if (
ds.size() > 0 || ac.isDangling[ii]) {
639 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
645 if (ac.outputsInputsAOD.size() > 0) {
647 ac.outputsInputsAOD.emplace_back(
InputSpec{
"tfn",
"TFN",
"TFNumber"});
648 ac.outputsInputsAOD.emplace_back(
InputSpec{
"tff",
"TFF",
"TFFilename"});
650 extraSpecs.push_back(fileSink);
652 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](
InputSpec& spec) ->
bool {
653 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin(
"TFN"));
655 size_t ii = std::distance(ac.outputsInputs.begin(), it);
656 ac.isDangling[ii] =
false;
659 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
663 std::vector<InputSpec> redirectedOutputsInputs;
664 for (
auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
665 if (ctx.options().get<std::string>(
"forwarding-policy") ==
"none") {
670 if (!ac.isDangling[ii] && (ctx.options().get<std::string>(
"forwarding-policy") !=
"all")) {
677 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
680 std::vector<InputSpec> unmatched;
681 auto forwardingDestination = ctx.options().get<std::string>(
"forwarding-destination");
682 if (redirectedOutputsInputs.size() > 0 && forwardingDestination ==
"file") {
684 if (unmatched.size() != redirectedOutputsInputs.size()) {
685 extraSpecs.push_back(fileSink);
687 }
else if (redirectedOutputsInputs.size() > 0 && forwardingDestination ==
"fairmq") {
689 extraSpecs.push_back(fairMQSink);
690 }
else if (forwardingDestination !=
"drop") {
691 throw runtime_error_f(
"Unknown forwarding destination %s", forwardingDestination.c_str());
693 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
694 std::vector<InputSpec> ignored = unmatched;
695 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
696 for (
auto& ignoredInput : ignored) {
697 ignoredInput.lifetime = Lifetime::Sporadic;
702 if (aodReader.outputs.empty() ==
false) {
706 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"injectServiceDevices",
"Injecting rate limited dummy sink");
711 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
801 std::vector<DeviceConnectionEdge>& logicalEdges,
802 std::vector<OutputSpec>& outputs,
803 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
806 if (workflow.empty()) {
811 std::vector<LogicalOutputInfo> availableOutputsInfo;
812 auto const& constOutputs = outputs;
814 std::vector<LogicalOutputInfo> forwards;
818 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
820 for (
size_t wi = 0; wi < workflow.size(); ++wi) {
821 auto& producer = workflow[wi];
822 if (producer.outputs.empty()) {
823 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output enumeration",
"No outputs for [%zu] %{public}s", wi, producer.name.c_str());
825 O2_SIGNPOST_START(workflow_helpers, sid,
"output enumeration",
"Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
827 for (
size_t oi = 0; oi < producer.outputs.size(); ++oi) {
828 auto& out = producer.outputs[oi];
829 auto uniqueOutputId = outputs.size();
833 outputs.push_back(out);
839 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](
size_t ci,
size_t ii) {
840 auto input = workflow[ci].inputs[ii];
841 std::ostringstream
str;
842 str <<
"No matching output found for "
843 <<
DataSpecUtils::describe(input) <<
" as requested by data processor \"" << workflow[ci].name <<
"\". Candidates:\n";
845 for (
auto&
output : constOutputs) {
849 throw std::runtime_error(
str.str());
863 enumerateAvailableOutputs();
865 std::vector<bool> matches(constOutputs.size());
866 for (
size_t consumer = 0; consumer < workflow.size(); ++consumer) {
868 O2_SIGNPOST_START(workflow_helpers, sid,
"input matching",
"Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].
name.c_str());
869 for (
size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
871 for (
size_t i = 0;
i < constOutputs.size();
i++) {
880 for (
size_t i = 0;
i < availableOutputsInfo.size();
i++) {
882 if (!matches[availableOutputsInfo[
i].outputGlobalIndex]) {
885 auto* oif = &availableOutputsInfo[
i];
889 auto producer = oif->specIndex;
890 auto uniqueOutputId = oif->outputGlobalIndex;
891 for (
size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
892 for (
size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
893 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid,
"output",
"Adding edge between %{public}s and %{public}s", workflow[consumer].
name.c_str(),
894 workflow[producer].name.c_str());
895 logicalEdges.emplace_back(
DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
900 oif->enabled =
false;
902 if (forwards.empty()) {
903 errorDueToMissingOutputFor(consumer, input);
905 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](
auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
906 for (
auto& forward : forwards) {
907 availableOutputsInfo.push_back(forward);