Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto const& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto const& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
160 DataProcessorSpec ccdbBackend{
161 .name = "internal-dpl-ccdb-backend",
162 .outputs = {},
163 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
164 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
165 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
166 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
167 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
168 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
169 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
172 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
173 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
174 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
175 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
176 };
177 DataProcessorSpec analysisCCDBBackend{
178 .name = "internal-dpl-aod-ccdb",
179 .inputs = {},
180 .outputs = {},
181 .algorithm = AlgorithmSpec::dummyAlgorithm(),
182 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
183 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
184 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
185 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
186 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
187 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
188 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
191 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
192 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
193 DataProcessorSpec transientStore{"internal-dpl-transient-store",
194 {},
195 {},
197 DataProcessorSpec qaStore{"internal-dpl-qa-store",
198 {},
199 {},
201 DataProcessorSpec timer{"internal-dpl-clock",
202 {},
203 {},
205
206 // In case InputSpec of origin AOD are
207 // requested but not available as part of the workflow,
208 // we insert in the configuration something which
209 // reads them from file.
210 //
211 // FIXME: source branch is DataOrigin, for the moment. We should
212 // make it configurable via ConfigParamsOptions
213 auto aodLifetime = Lifetime::Enumeration;
214
215 DataProcessorSpec aodReader{
216 .name = "internal-dpl-aod-reader",
217 .inputs = {InputSpec{"enumeration",
218 "DPL",
219 "ENUM",
220 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
221 aodLifetime}},
222 .algorithm = AlgorithmSpec::dummyAlgorithm(),
223 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
224 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
225 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
226 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
227 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
228 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
229 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
230 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
231 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
232 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
233
234 ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
235 auto& dec = ctx.services().get<DanglingEdgesContext>();
236
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
239
240 for (size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
243 uint32_t hash = runtime_hash(name.c_str());
244 dec.outTskMap.push_back({hash, name});
245
246 std::string prefix = "internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
248 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
249 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
250 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
251 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
252 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
253 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
254 }
255 bool hasTimeframeInputs = std::ranges::any_of(processor.inputs, [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::ranges::any_of(processor.outputs, [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
257
258 // A timeframeSink consumes timeframes without creating new
259 // timeframe data.
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
263 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
264 bool hasMatch = false;
265 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
266 auto summaryOutput = std::ranges::find_if(processor.outputs, [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
267 if (summaryOutput != processor.outputs.end()) {
268 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
269 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
270 hasMatch = true;
271 }
272
273 if (!hasMatch) {
274 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
275 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
276 }
277 }
278 }
279 bool hasConditionOption = false;
280 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
281 auto& input = processor.inputs[ii];
282 switch (input.lifetime) {
283 case Lifetime::Timer: {
284 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
285 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "period-" + input.binding); });
286 if (hasOption == false) {
287 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
288 }
289 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
290 } break;
291 case Lifetime::Signal: {
292 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
293 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
294 } break;
295 case Lifetime::Enumeration: {
296 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
297 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
298 } break;
299 case Lifetime::Condition: {
300 requestedCCDBs.emplace_back(input);
301 if ((hasConditionOption == false) && std::ranges::none_of(processor.options, [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
302 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
303 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
304 hasConditionOption = true;
305 }
306 } break;
307 case Lifetime::OutOfBand: {
308 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
309 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
310 if (hasOption == false) {
311 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
312 }
313 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
314 } break;
315 case Lifetime::QA:
316 case Lifetime::Transient:
317 case Lifetime::Timeframe:
318 case Lifetime::Optional:
319 break;
320 }
321 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
322 DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input});
323 }
325 DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input});
326 }
328 DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input});
329 }
331 DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input});
332 }
333 }
334
335 std::ranges::stable_sort(timer.outputs, [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
336
337 for (auto& output : processor.outputs) {
338 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
339 dec.providedAODs.emplace_back(output);
341 dec.providedDYNs.emplace_back(output);
343 dec.providedTIMs.emplace_back(output);
345 dec.providedOutputObjHist.emplace_back(output);
346 auto it = std::ranges::find_if(dec.outObjHistMap, [&](auto&& x) { return x.id == hash; });
347 if (it == dec.outObjHistMap.end()) {
348 dec.outObjHistMap.push_back({hash, {output.binding.value}});
349 } else {
350 it->bindings.push_back(output.binding.value);
351 }
352 }
353 if (output.lifetime == Lifetime::Condition) {
354 providedCCDBs.push_back(output);
355 }
356 }
357 }
358
359 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
360 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
361 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
362 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
363 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
364 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
365
366 DataProcessorSpec indexBuilder{
367 "internal-dpl-aod-index-builder",
368 {},
369 {},
370 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
371 {}};
372 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
373
374 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
376 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
377 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedTIMs, analysisCCDBBackend);
378 }
379
380 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
381
382 DataProcessorSpec aodSpawner{
383 "internal-dpl-aod-spawner",
384 {},
385 {},
386 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
387 {}};
388 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
389 AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
390
391 std::ranges::sort(requestedCCDBs, inputSpecLessThan);
392 std::ranges::sort(providedCCDBs, outputSpecLessThan);
393 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
394
395 std::vector<DataProcessorSpec> extraSpecs;
396
397 if (transientStore.outputs.empty() == false) {
398 extraSpecs.push_back(transientStore);
399 }
400 if (qaStore.outputs.empty() == false) {
401 extraSpecs.push_back(qaStore);
402 }
403
404 if (aodSpawner.outputs.empty() == false) {
405 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
406 }
407
408 if (indexBuilder.outputs.empty() == false) {
409 extraSpecs.push_back(indexBuilder);
410 }
411
412 // add the reader
413 if (aodReader.outputs.empty() == false) {
414 auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; });
415 if (mctracks2aod == workflow.end()) {
416 // add normal reader
417 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
418 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
419 } else {
420 // AODs are being injected on-the-fly, add error-handler reader
421 aodReader.algorithm = AlgorithmSpec{
423 [](DeviceSpec const& spec) {
424 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
425 for (auto const& output : spec.outputs) {
426 LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
427 }
428 LOGP(fatal, "Stopping.");
429 // to ensure the output type for adaptStateful
430 return adaptStateless([](DataAllocator&) {});
431 })};
432 }
433 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
434 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
435 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
436 }
437
438 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
439 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
440 // Check if any of the provided outputs is a DISTSTF
441 // Check if any of the requested inputs is for a 0xccdb message
442 bool providesDISTSTF = std::ranges::any_of(workflow,
443 [&matcher](auto const& dp) {
444 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
445 return DataSpecUtils::match(matcher, output);
446 });
447 });
448
449 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
450 // we add to the first data processor which has no inputs (apart from
451 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
452 bool requiresDISTSUBTIMEFRAME = std::ranges::any_of(workflow,
453 [&dstf](auto const& dp) {
454 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
455 return DataSpecUtils::match(input, dstf);
456 });
457 });
458
459 // We find the first device which has either just enumerations or
460 // just timers, and we will add the DISTSUBTIMEFRAME to it.
461 // Notice how we do so in a stable manner by sorting the devices
462 // by name.
463 int enumCandidate = -1;
464 int timerCandidate = -1;
465 for (auto wi = 0U; wi < workflow.size(); ++wi) {
466 auto& dp = workflow[wi];
467 if (dp.inputs.size() != 1) {
468 continue;
469 }
470 auto lifetime = dp.inputs[0].lifetime;
471 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
472 enumCandidate = wi;
473 }
474 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
475 timerCandidate = wi;
476 }
477 }
478
479 // * If there are AOD outputs we use TFNumber as the CCDB clock
480 // * If one device provides a DISTSTF we use that as the CCDB clock
481 // * If one of the devices provides a timer we use that as the CCDB clock
482 // * If none of the above apply, add to the first data processor
483 // which has no inputs apart from enumerations the responsibility
484 // to provide the DISTSUBTIMEFRAME.
485 if (ccdbBackend.outputs.empty() == false) {
486 if (aodReader.outputs.empty() == false) {
487 // fetcher clock follows AOD source (TFNumber)
488 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
489 } else if (providesDISTSTF) {
490 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
491 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
492 } else {
493 if (enumCandidate != -1) {
494 // add DSTF/ccdb source to the enumeration-driven source explicitly
495 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
496 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
497 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
498 } else if (timerCandidate != -1) {
499 // fetcher clock is proived by timer source
500 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
501 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
502 }
503 }
504
505 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
506 // Load the CCDB backend from the plugin
507 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
508 extraSpecs.push_back(ccdbBackend);
509 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
510 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
511 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
512 }
513
514 // add the Analysys CCDB backend which reads CCDB objects using a provided table
515 if (analysisCCDBBackend.outputs.empty() == false) {
516 extraSpecs.push_back(analysisCCDBBackend);
517 }
518
519 // add the timer
520 if (timer.outputs.empty() == false) {
521 extraSpecs.push_back(timer);
522 }
523
524 // This is to inject a file sink so that any dangling ATSK object is written
525 // to a ROOT file.
526 if (dec.providedOutputObjHist.empty() == false) {
528 extraSpecs.push_back(rootSink);
529 }
530
531 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
532 extraSpecs.clear();
533
534 injectAODWriter(workflow, ctx);
535
536 // Select dangling outputs which are not of type AOD
537 std::vector<InputSpec> redirectedOutputsInputs;
538 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
539 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
540 continue;
541 }
542 // We forward to the output proxy all the inputs only if they are dangling
543 // or if the forwarding policy is "proxy".
544 if (!dec.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
545 continue;
546 }
547 // AODs are skipped in any case.
548 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
549 continue;
550 }
551 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
552 }
553
554 std::vector<InputSpec> unmatched;
555 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
556 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
557 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
558 if (unmatched.size() != redirectedOutputsInputs.size()) {
559 extraSpecs.push_back(fileSink);
560 }
561 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
562 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
563 extraSpecs.push_back(fairMQSink);
564 } else if (forwardingDestination != "drop") {
565 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
566 }
567 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
568 std::vector<InputSpec> ignored = unmatched;
569 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
570 for (auto& ignoredInput : ignored) {
571 ignoredInput.lifetime = Lifetime::Sporadic;
572 }
573
574 // Use the new dummy sink when the AOD reader is there
575 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
576 if (aodReader.outputs.empty() == false) {
577 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
578 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
579 } else {
580 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
581 std::string rateLimitingChannelConfigOutput;
582 if (rateLimitingIPCID != -1) {
583 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
584 }
585 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
586 }
587 }
588
589 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
590 extraSpecs.clear();
591}
592
594{
595 unsigned int distSTFCount = 0;
596 for (auto& spec : workflow) {
597 auto& inputs = spec.inputs;
598 bool allSporadic = true;
599 bool hasTimer = false;
600 bool hasSporadic = false;
601 bool hasOptionals = false;
602 for (auto& input : inputs) {
603 if (input.lifetime == Lifetime::Optional) {
604 hasOptionals = true;
605 }
606 }
607 for (auto& input : inputs) {
608 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
609 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
610 // have Optional inputs as well.
611 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
612 // forwarded before actual RAWDATA arrives.
613 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
614 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
615 LOGP(error,
616 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
617 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
618 DataSpecUtils::describe(input), input.binding);
619 }
620 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
621 // The first one remains unchanged, therefore we use the postincrement
622 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
623 continue;
624 }
625 // Timers are sporadic only when they are not
626 // alone.
627 if (input.lifetime == Lifetime::Timer) {
628 hasTimer = true;
629 continue;
630 }
631 if (input.lifetime == Lifetime::Sporadic) {
632 hasSporadic = true;
633 } else {
634 allSporadic = false;
635 }
636 }
637
638 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
639
640 // If they are not all sporadic (excluding timers)
641 // we leave things as they are.
642 if (allSporadic == false) {
643 continue;
644 }
645 // A timer alone is not sporadic.
646 if (hasSporadic == false) {
647 continue;
648 }
652 for (auto& output : spec.outputs) {
653 if (output.lifetime == Lifetime::Timeframe) {
654 output.lifetime = Lifetime::Sporadic;
655 }
656 }
657 }
658
659 if (distSTFCount > 0) {
660 for (auto& spec : workflow) {
661 if (std::ranges::any_of(spec.outputs, [](auto const& output) { return DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0}); })) {
662 for (unsigned int i = 1; i < distSTFCount; ++i) {
663 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
664 }
665 break;
666 }
667 }
668 }
669}
670
672{
673 auto& dec = ctx.services().get<DanglingEdgesContext>();
675 std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
676
677 // create DataOutputDescriptor
678 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
679
680 // select outputs of type AOD which need to be saved
681 dec.outputsInputsAOD.clear();
682 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
683 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
684 auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
685 if (ds.size() > 0 || dec.isDangling[ii]) {
686 dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
687 }
688 }
689 }
690
691 // file sink for any AOD output
692 if (dec.outputsInputsAOD.size() > 0) {
693 // add TFNumber and TFFilename as input to the writer
694 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
695 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
696 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
697 workflow.push_back(fileSink);
698
699 auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
700 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
701 });
702 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
703 }
704}
705
707 std::vector<DeviceConnectionEdge>& logicalEdges,
708 std::vector<OutputSpec>& outputs,
709 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
710{
711 // In case the workflow is empty, we do not have anything to do.
712 if (workflow.empty()) {
713 return;
714 }
715
716 // This is the state. Oif is the iterator I use for the searches.
717 std::vector<LogicalOutputInfo> availableOutputsInfo;
718 auto const& constOutputs = outputs; // const version of the outputs
719 // Forwards is a local cache to avoid adding forwards before time.
720 std::vector<LogicalOutputInfo> forwards;
721
722 // Notice that availableOutputsInfo MUST be updated first, since it relies on
723 // the size of outputs to be the one before the update.
724 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
725 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
726 for (size_t wi = 0; wi < workflow.size(); ++wi) {
727 auto& producer = workflow[wi];
728 if (producer.outputs.empty()) {
729 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
730 }
731 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
732
733 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
734 auto& out = producer.outputs[oi];
735 auto uniqueOutputId = outputs.size();
736 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
737 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
738 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
739 outputs.push_back(out);
740 }
741 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
742 }
743 };
744
745 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
746 auto input = workflow[ci].inputs[ii];
747 std::ostringstream str;
748 str << "No matching output found for "
749 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
750
751 for (auto& output : constOutputs) {
752 str << "-" << DataSpecUtils::describe(output) << "\n";
753 }
754
755 throw std::runtime_error(str.str());
756 };
757
758 // This is the outer loop
759 //
760 // Here we iterate over dataprocessor items in workflow and we consider them
761 // as consumer, since we are interested in their inputs.
762 // Notice also we need to search for all the matching inputs, since
763 // we could have more than one source that matches (e.g. in the
764 // case of a time merger).
765 // Once consumed, an output is not actually used anymore, however
766 // we append it as a forward.
767 // Finally, If a device has n-way pipelining, we need to create one node per
768 // parallel pipeline and add an edge for each.
769 enumerateAvailableOutputs();
770
771 std::vector<bool> matches(constOutputs.size());
772 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
773 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
774 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
775 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
776 forwards.clear();
777 for (size_t i = 0; i < constOutputs.size(); i++) {
778 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
779 if (matches[i]) {
780 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
781 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
782 DataSpecUtils::describe(constOutputs[i]).c_str());
783 }
784 }
785
786 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
787 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
788 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
789 continue;
790 }
791 auto* oif = &availableOutputsInfo[i];
792 if (oif->forward) {
793 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
794 }
795 auto producer = oif->specIndex;
796 auto uniqueOutputId = oif->outputGlobalIndex;
797 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
798 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
799 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
800 workflow[producer].name.c_str());
801 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
802 }
803 }
804 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
805 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
806 oif->enabled = false;
807 }
808 if (forwards.empty()) {
809 errorDueToMissingOutputFor(consumer, input);
810 }
811 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
812 for (auto& forward : forwards) {
813 availableOutputsInfo.push_back(forward);
814 }
815 }
816 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
817 }
818}
819
820std::vector<EdgeAction>
822 const std::vector<DeviceConnectionEdge>& edges,
823 const std::vector<size_t>& index)
824{
825 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
826
827 assert(edges.size() == index.size());
828 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
829 for (size_t i : index) {
830 auto& edge = edges[i];
831 auto& action = actions[i];
832 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
833 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
834 last = edge;
835 }
836 return actions;
837}
838
839std::vector<EdgeAction>
841 const std::vector<DeviceConnectionEdge>& edges,
842 const std::vector<size_t>& index)
843{
844 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
845
846 assert(edges.size() == index.size());
847 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
848 for (size_t i : index) {
849 auto& edge = edges[i];
850 auto& action = actions[i];
851 // Calculate which actions need to be taken for this edge.
852 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
853 action.requiresNewChannel =
854 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
855
856 last = edge;
857 }
858 return actions;
859}
860
861void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
862 std::vector<size_t>& outEdgeIndex,
863 const std::vector<DeviceConnectionEdge>& edges)
864{
865 inEdgeIndex.resize(edges.size());
866 outEdgeIndex.resize(edges.size());
867 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
868 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
869
870 // Two indexes, one to bind the outputs, the other
871 // one to connect the inputs. The
872 auto outSorter = [&edges](size_t i, size_t j) {
873 auto& a = edges[i];
874 auto& b = edges[j];
875 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
876 };
877 auto inSorter = [&edges](size_t i, size_t j) {
878 auto& a = edges[i];
879 auto& b = edges[j];
880 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
881 };
882
883 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
884 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
885}
886
888{
889 if (workflow.empty()) {
891 }
892 std::set<std::string> validNames;
893 std::vector<OutputSpec> availableOutputs;
894 std::vector<InputSpec> requiredInputs;
895
896 // An index many to one index to go from a given input to the
897 // associated spec
898 std::map<size_t, size_t> inputToSpec;
899 // A one to one index to go from a given output to the Spec emitting it
900 std::map<size_t, size_t> outputToSpec;
901
902 std::ostringstream ss;
903
904 for (auto& spec : workflow) {
905 if (spec.name.empty()) {
906 throw std::runtime_error("Invalid DataProcessorSpec name");
907 }
908 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
909 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
910 }
911 if (validNames.find(spec.name) != validNames.end()) {
912 throw std::runtime_error("Name " + spec.name + " is used twice.");
913 }
914 validNames.insert(spec.name);
915 for (auto& option : spec.options) {
916 if (option.defaultValue.type() != VariantType::Empty &&
917 option.type != option.defaultValue.type()) {
918 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
919 << ") for " << option.name << " in DataProcessorSpec of "
920 << spec.name;
921 throw std::runtime_error(ss.str());
922 }
923 }
924 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
925 InputSpec const& input = spec.inputs[ii];
926 if (DataSpecUtils::validate(input) == false) {
927 ss << "In spec " << spec.name << " input specification "
928 << ii << " requires binding, description and origin"
929 " to be fully specified (found "
930 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
931 throw std::runtime_error(ss.str());
932 }
933 }
934 }
936}
937
938using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
941 size_t id;
942};
943
944std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
945{
946 // compute total number of input/output
947 size_t totalInputs = 0;
948 size_t totalOutputs = 0;
949 for (auto& spec : workflow) {
950 totalInputs += spec.inputs.size();
951 totalOutputs += spec.outputs.size();
952 }
953
954 std::vector<DataMatcherId> inputs;
955 std::vector<DataMatcherId> outputs;
956 inputs.reserve(totalInputs);
957 outputs.reserve(totalOutputs);
958
959 std::vector<InputSpec> results;
960 std::vector<bool> isDangling;
961 results.reserve(totalOutputs);
962 isDangling.reserve(totalOutputs);
963
965 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
966 auto& spec = workflow[wi];
967 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
968 inputs.emplace_back(DataMatcherId{wi, ii});
969 }
970 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
971 outputs.emplace_back(DataMatcherId{wi, oi});
972 }
973 }
974
975 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
976 auto& output = outputs[oi];
977 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
978
979 // is dangling output?
980 bool matched = false;
981 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
982 auto& input = inputs[ii];
983 // Inputs of the same workflow cannot match outputs
984 if (output.workflowId == input.workflowId) {
985 continue;
986 }
987 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
988 if (DataSpecUtils::match(inputSpec, outputSpec)) {
989 matched = true;
990 break;
991 }
992 }
993
994 auto input = DataSpecUtils::matchingInput(outputSpec);
995 char buf[64];
996 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
997
998 // make sure that entries are unique
999 if (std::ranges::find(results, input) == results.end()) {
1000 results.emplace_back(input);
1001 isDangling.emplace_back(matched == false);
1002 }
1003 }
1004
1005 // make sure that results is unique
1006 return std::make_tuple(results, isDangling);
1007}
1008
1009std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1010{
1011
1012 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1013
1014 std::vector<InputSpec> results;
1015 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1016 if (isDangling[ii]) {
1017 results.emplace_back(outputsInputs[ii]);
1018 }
1019 }
1020
1021 return results;
1022}
1023
1024bool validateLifetime(std::ostream& errors,
1025 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1026 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1027{
1028 // In case the completion policy is consume-any, we do not need to check anything.
1029 if (consumerPolicies.completionPolicyName == "consume-any") {
1030 return true;
1031 }
1032 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1033 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1034 DataSpecUtils::describe(input).c_str(), consumer.name,
1035 DataSpecUtils::describe(output).c_str(), producer.name);
1036 return false;
1037 }
1038 return true;
1039}
1040
1041bool validateExpendable(std::ostream& errors,
1042 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1043 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1044{
1045 auto isExpendable = [](DataProcessorLabel const& label) {
1046 return label.value == "expendable";
1047 };
1048 auto isResilient = [](DataProcessorLabel const& label) {
1049 return label.value == "expendable" || label.value == "resilient";
1050 };
1051 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1052 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1053 if (producerExpendable && consumerCritical) {
1054 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1055 consumer.name,
1056 producer.name);
1057 return false;
1058 }
1059 return true;
1060}
1061
1062using Validator = std::function<bool(std::ostream& errors,
1063 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1064 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1065
1067 std::vector<DataProcessorPoliciesInfo> const& policies,
1068 std::vector<DeviceConnectionEdge> const& edges,
1069 std::vector<OutputSpec> const& outputs)
1070{
1071 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1072 std::vector<Validator> defaultValidators = {validateExpendable};
1073 if (!disableLifetimeCheck) {
1074 defaultValidators.emplace_back(validateLifetime);
1075 }
1076 std::stringstream errors;
1077 // Iterate over all the edges.
1078 // Get the input lifetime and the output lifetime.
1079 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1080 bool hasErrors = false;
1081 for (auto& edge : edges) {
1082 DataProcessorSpec const& producer = workflow[edge.producer];
1083 DataProcessorSpec const& consumer = workflow[edge.consumer];
1084 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1085 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1086 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1087 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1088 for (auto& validator : defaultValidators) {
1089 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1090 }
1091 }
1092 if (hasErrors) {
1093 throw std::runtime_error(errors.str());
1094 }
1095}
1096
1097} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining PrimaryVertex explicitly as messageable.
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
o2::mch::DsIndex ds
const std::string str