Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto const& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto const& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
160 DataProcessorSpec ccdbBackend{
161 .name = "internal-dpl-ccdb-backend",
162 .outputs = {},
163 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
164 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
165 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
166 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
167 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
168 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
169 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
172 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
173 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
174 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
175 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
176 };
177 DataProcessorSpec analysisCCDBBackend{
178 .name = "internal-dpl-aod-ccdb",
179 .inputs = {},
180 .outputs = {},
181 .algorithm = AlgorithmSpec::dummyAlgorithm(),
182 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
183 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
184 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
185 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
186 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
187 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
188 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
191 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
192 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
193 DataProcessorSpec transientStore{"internal-dpl-transient-store",
194 {},
195 {},
197 DataProcessorSpec qaStore{"internal-dpl-qa-store",
198 {},
199 {},
201 DataProcessorSpec timer{"internal-dpl-clock",
202 {},
203 {},
205
206 // In case InputSpec of origin AOD are
207 // requested but not available as part of the workflow,
208 // we insert in the configuration something which
209 // reads them from file.
210 //
211 // FIXME: source branch is DataOrigin, for the moment. We should
212 // make it configurable via ConfigParamsOptions
213 auto aodLifetime = Lifetime::Enumeration;
214
215 DataProcessorSpec aodReader{
216 .name = "internal-dpl-aod-reader",
217 .inputs = {InputSpec{"enumeration",
218 "DPL",
219 "ENUM",
220 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
221 aodLifetime}},
222 .algorithm = AlgorithmSpec::dummyAlgorithm(),
223 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
224 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
225 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
226 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
227 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
228 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
229 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
230 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
231 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
232 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
233
234 ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
235 auto& dec = ctx.services().get<DanglingEdgesContext>();
236
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
239
240 for (size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
243 uint32_t hash = runtime_hash(name.c_str());
244 dec.outTskMap.push_back({hash, name});
245
246 std::string prefix = "internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
248 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
249 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
250 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
251 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
252 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
253 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
254 }
255 bool hasTimeframeInputs = std::ranges::any_of(processor.inputs, [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::ranges::any_of(processor.outputs, [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
257
258 // A timeframeSink consumes timeframes without creating new
259 // timeframe data.
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
263 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
264 bool hasMatch = false;
265 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
266 auto summaryOutput = std::ranges::find_if(processor.outputs, [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
267 if (summaryOutput != processor.outputs.end()) {
268 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
269 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
270 hasMatch = true;
271 }
272
273 if (!hasMatch) {
274 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
275 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
276 }
277 }
278 }
279 bool hasConditionOption = false;
280 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
281 auto& input = processor.inputs[ii];
282 bool hasProjectors = false;
283 bool hasIndexRecords = false;
284 bool hasCCDBURLs = false;
285 // all three options are exclusive
286 for (auto const& p : input.metadata) {
287 if (p.name.compare("projectors") == 0) {
288 hasProjectors = true;
289 break;
290 }
291 if (p.name.compare("index-records") == 0) {
292 hasIndexRecords = true;
293 break;
294 }
295 if (p.name.starts_with("ccdb:")) {
296 hasCCDBURLs = true;
297 break;
298 }
299 }
300 switch (input.lifetime) {
301 case Lifetime::Timer: {
302 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
303 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "period-" + input.binding); });
304 if (hasOption == false) {
305 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
306 }
307 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
308 } break;
309 case Lifetime::Signal: {
310 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
311 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
312 } break;
313 case Lifetime::Enumeration: {
314 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
315 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
316 } break;
317 case Lifetime::Condition: {
318 requestedCCDBs.emplace_back(input);
319 if ((hasConditionOption == false) && std::ranges::none_of(processor.options, [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
320 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
321 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
322 hasConditionOption = true;
323 }
324 } break;
325 case Lifetime::OutOfBand: {
326 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
327 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
328 if (hasOption == false) {
329 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
330 }
331 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
332 } break;
333 case Lifetime::QA:
334 case Lifetime::Transient:
335 case Lifetime::Timeframe:
336 case Lifetime::Optional:
337 break;
338 }
339 if (hasProjectors) {
340 DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input});
341 } else if (hasIndexRecords) {
342 DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input});
343 } else if (hasCCDBURLs) {
344 DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input});
345 } else if (DataSpecUtils::partialMatch(input, AODOrigins)) {
346 DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input});
347 }
348 }
349
350 std::ranges::stable_sort(timer.outputs, [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
351
352 for (auto& output : processor.outputs) {
353 bool hasProjectors = false;
354 bool hasIndexRecords = false;
355 bool hasCCDBURLs = false;
356 // all three options are exclusive
357 for (auto const& p : output.metadata) {
358 if (p.name.compare("projectors") == 0) {
359 hasProjectors = true;
360 break;
361 }
362 if (p.name.compare("index-records") == 0) {
363 hasIndexRecords = true;
364 break;
365 }
366 if (p.name.starts_with("ccdb:")) {
367 hasCCDBURLs = true;
368 break;
369 }
370 }
371 if (hasProjectors) {
372 dec.providedDYNs.emplace_back(output);
373 } else if (hasCCDBURLs) {
374 dec.providedTIMs.emplace_back(output);
375 } else if (hasIndexRecords) {
376 dec.providedIDXs.emplace_back(output);
377 } else if (DataSpecUtils::partialMatch(output, AODOrigins)) {
378 dec.providedAODs.emplace_back(output);
380 dec.providedOutputObjHist.emplace_back(output);
381 auto it = std::ranges::find_if(dec.outObjHistMap, [&](auto&& x) { return x.id == hash; });
382 if (it == dec.outObjHistMap.end()) {
383 dec.outObjHistMap.push_back({hash, {output.binding.value}});
384 } else {
385 it->bindings.push_back(output.binding.value);
386 }
387 }
388
389 if (output.lifetime == Lifetime::Condition) {
390 providedCCDBs.push_back(output);
391 }
392 }
393 }
394
395 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
396 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
397
398 DataProcessorSpec indexBuilder{
399 "internal-dpl-aod-index-builder",
400 {},
401 {},
402 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
403 {}};
404 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
405 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
406 dec.requestedIDXs | views::filter_not_matching(dec.providedIDXs) | sinks::append_to{dec.builderInputs};
407 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
408
409 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
410 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
411 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
412 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, analysisCCDBBackend);
413
414 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
415 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
416 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
417
418 DataProcessorSpec aodSpawner{
419 "internal-dpl-aod-spawner",
420 {},
421 {},
422 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
423 {}};
424 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
425
426 std::ranges::sort(dec.requestedAODs, inputSpecLessThan);
427 std::ranges::sort(dec.providedAODs, outputSpecLessThan);
428 AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
429
430 std::ranges::sort(requestedCCDBs, inputSpecLessThan);
431 std::ranges::sort(providedCCDBs, outputSpecLessThan);
432 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
433
434 std::vector<DataProcessorSpec> extraSpecs;
435
436 if (transientStore.outputs.empty() == false) {
437 extraSpecs.push_back(transientStore);
438 }
439 if (qaStore.outputs.empty() == false) {
440 extraSpecs.push_back(qaStore);
441 }
442
443 if (aodSpawner.outputs.empty() == false) {
444 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
445 }
446
447 if (indexBuilder.outputs.empty() == false) {
448 extraSpecs.push_back(indexBuilder);
449 }
450
451 // add the Analysys CCDB backend which reads CCDB objects using a provided table
453 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
454 if (analysisCCDBBackend.outputs.empty() == false) {
455 extraSpecs.push_back(analysisCCDBBackend);
456 }
457 }
458
459 auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
460 return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
461 return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
462 });
463 });
464
465 // add the reader
466 if (aodReader.outputs.empty() == false) {
467 if (tfnsource == workflow.end()) {
468 // add normal reader
469 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
470 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
471 } else {
472 // AODs are being injected the tfnsource is the entry point, add error-handler reader
473 aodReader.algorithm = AlgorithmSpec{
475 [](DeviceSpec const& spec) {
476 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
477 for (auto const& output : spec.outputs) {
478 LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
479 }
480 LOGP(fatal, "Stopping.");
481 // to ensure the output type for adaptStateful
482 return adaptStateless([](DataAllocator&) {});
483 })};
484 }
485 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
486 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
487 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
488 }
489
490 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
491 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
492 // Check if any of the provided outputs is a DISTSTF
493 // Check if any of the requested inputs is for a 0xccdb message
494 bool providesDISTSTF = std::ranges::any_of(workflow,
495 [&matcher](auto const& dp) {
496 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
497 return DataSpecUtils::match(matcher, output);
498 });
499 });
500
501 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
502 // we add to the first data processor which has no inputs (apart from
503 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
504 bool requiresDISTSUBTIMEFRAME = std::ranges::any_of(workflow,
505 [&dstf](auto const& dp) {
506 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
507 return DataSpecUtils::match(input, dstf);
508 });
509 });
510
511 // We find the first device which has either just enumerations or
512 // just timers, and we will add the DISTSUBTIMEFRAME to it.
513 // Notice how we do so in a stable manner by sorting the devices
514 // by name.
515 int enumCandidate = -1;
516 int timerCandidate = -1;
517 for (auto wi = 0U; wi < workflow.size(); ++wi) {
518 auto& dp = workflow[wi];
519 if (dp.inputs.size() != 1) {
520 continue;
521 }
522 auto lifetime = dp.inputs[0].lifetime;
523 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
524 enumCandidate = wi;
525 }
526 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
527 timerCandidate = wi;
528 }
529 }
530
531 // * If there are AOD outputs we use TFNumber as the CCDB clock
532 // * If one device provides a DISTSTF we use that as the CCDB clock
533 // * If one of the devices provides a timer we use that as the CCDB clock
534 // * If none of the above apply, add to the first data processor
535 // which has no inputs apart from enumerations the responsibility
536 // to provide the DISTSUBTIMEFRAME.
537 if (ccdbBackend.outputs.empty() == false) {
538 if (aodReader.outputs.empty() == false) {
539 // fetcher clock follows AOD source (TFNumber)
540 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
541 } else if (providesDISTSTF) {
542 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
543 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
544 } else {
545 if (enumCandidate != -1) {
546 // add DSTF/ccdb source to the enumeration-driven source explicitly
547 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
548 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
549 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
550 } else if (timerCandidate != -1) {
551 // fetcher clock is proived by timer source
552 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
553 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
554 }
555 }
556
557 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
558 // Load the CCDB backend from the plugin
559 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
560 extraSpecs.push_back(timePipeline(ccdbBackend, ctx.options().get<int64_t>("ccdb-fetchers")));
561 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
562 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
563 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
564 }
565
566 // add the timer
567 if (timer.outputs.empty() == false) {
568 extraSpecs.push_back(timer);
569 }
570
571 // This is to inject a file sink so that any dangling ATSK object is written
572 // to a ROOT file.
573 if (dec.providedOutputObjHist.empty() == false) {
575 extraSpecs.push_back(rootSink);
576 }
577
578 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
579 extraSpecs.clear();
580
581 injectAODWriter(workflow, ctx);
582
583 // Select dangling outputs which are not of type AOD
584 std::vector<InputSpec> redirectedOutputsInputs;
585 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
586 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
587 continue;
588 }
589 // We forward to the output proxy all the inputs only if they are dangling
590 // or if the forwarding policy is "proxy".
591 if (!dec.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
592 continue;
593 }
594 // AODs are skipped in any case.
595 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
596 continue;
597 }
598 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
599 }
600
601 std::vector<InputSpec> unmatched;
602 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
603 // update tfnsource iterator (could be aod-reader)
604 tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
605 return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
606 return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
607 });
608 });
609 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
610 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
611 if (unmatched.size() != redirectedOutputsInputs.size()) {
612 extraSpecs.push_back(fileSink);
613 }
614 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
615 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
616 extraSpecs.push_back(fairMQSink);
617 } else if (forwardingDestination != "drop") {
618 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
619 }
620 if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
621 std::vector<InputSpec> ignored = unmatched;
622 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
623 for (auto& ignoredInput : ignored) {
624 ignoredInput.lifetime = Lifetime::Sporadic;
625 }
626
627 // Use the new dummy sink when the AOD reader is there
628 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
629 if (tfnsource != workflow.end()) {
630 DataSpecUtils::updateInputList(ignored, InputSpec{"tfn", "TFN", "TFNumber", 0, Lifetime::Sporadic});
631 DataSpecUtils::updateInputList(ignored, InputSpec{"tff", "TFF", "TFFilename", 0, Lifetime::Sporadic});
632 }
633
634 if (tfnsource != workflow.end() && !tfnsource->name.starts_with("aod-producer-workflow")) { // any tfnsource except the aod-producer should use scheduled sink
635 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
636 // if there is a tfnsource, make sure the sink gets TFN/TFF
637 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
638 } else { // if there is no tfn source or if that source is aod-producer-workflow, out-of-band channel is used to propagate the number of consumed timeframes
639 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
640 std::string rateLimitingChannelConfigOutput;
641 if (rateLimitingIPCID != -1) {
642 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
643 }
644 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
645 }
646 }
647
648 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
649 extraSpecs.clear();
650}
651
653{
654 unsigned int distSTFCount = 0;
655 for (auto& spec : workflow) {
656 auto& inputs = spec.inputs;
657 bool allSporadic = true;
658 bool hasTimer = false;
659 bool hasSporadic = false;
660 bool hasOptionals = false;
661 for (auto& input : inputs) {
662 if (input.lifetime == Lifetime::Optional) {
663 hasOptionals = true;
664 }
665 }
666 for (auto& input : inputs) {
667 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
668 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
669 // have Optional inputs as well.
670 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
671 // forwarded before actual RAWDATA arrives.
672 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
673 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
674 LOGP(error,
675 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
676 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
677 DataSpecUtils::describe(input), input.binding);
678 }
679 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
680 // The first one remains unchanged, therefore we use the postincrement
681 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
682 continue;
683 }
684 // Timers are sporadic only when they are not
685 // alone.
686 if (input.lifetime == Lifetime::Timer) {
687 hasTimer = true;
688 continue;
689 }
690 if (input.lifetime == Lifetime::Sporadic) {
691 hasSporadic = true;
692 } else {
693 allSporadic = false;
694 }
695 }
696
697 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
698
699 // If they are not all sporadic (excluding timers)
700 // we leave things as they are.
701 if (allSporadic == false) {
702 continue;
703 }
704 // A timer alone is not sporadic.
705 if (hasSporadic == false) {
706 continue;
707 }
711 for (auto& output : spec.outputs) {
712 if (output.lifetime == Lifetime::Timeframe) {
713 output.lifetime = Lifetime::Sporadic;
714 }
715 }
716 }
717
718 if (distSTFCount > 0) {
719 for (auto& spec : workflow) {
720 if (std::ranges::any_of(spec.outputs, [](auto const& output) { return DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0}); })) {
721 for (unsigned int i = 1; i < distSTFCount; ++i) {
722 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
723 }
724 break;
725 }
726 }
727 }
728}
729
731{
732 auto& dec = ctx.services().get<DanglingEdgesContext>();
734 std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
735
736 // create DataOutputDescriptor
737 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
738
739 // select outputs of type AOD which need to be saved
740 dec.outputsInputsAOD.clear();
741 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
742 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
743 auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
744 if (ds.size() > 0 || dec.isDangling[ii]) {
745 dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
746 }
747 }
748 }
749
750 // file sink for any AOD output
751 if (dec.outputsInputsAOD.size() > 0) {
752 // add TFNumber and TFFilename as input to the writer
753 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
754 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
755 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
756 workflow.push_back(fileSink);
757
758 auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
759 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
760 });
761 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
762
763 it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
764 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF"));
765 });
766 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
767 }
768}
769
771 std::vector<DeviceConnectionEdge>& logicalEdges,
772 std::vector<OutputSpec>& outputs,
773 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
774{
775 // In case the workflow is empty, we do not have anything to do.
776 if (workflow.empty()) {
777 return;
778 }
779
780 // This is the state. Oif is the iterator I use for the searches.
781 std::vector<LogicalOutputInfo> availableOutputsInfo;
782 auto const& constOutputs = outputs; // const version of the outputs
783 // Forwards is a local cache to avoid adding forwards before time.
784 std::vector<LogicalOutputInfo> forwards;
785
786 // Notice that availableOutputsInfo MUST be updated first, since it relies on
787 // the size of outputs to be the one before the update.
788 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
789 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
790 for (size_t wi = 0; wi < workflow.size(); ++wi) {
791 auto& producer = workflow[wi];
792 if (producer.outputs.empty()) {
793 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
794 }
795 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
796
797 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
798 auto& out = producer.outputs[oi];
799 auto uniqueOutputId = outputs.size();
800 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
801 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
802 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
803 outputs.push_back(out);
804 }
805 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
806 }
807 };
808
809 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
810 auto input = workflow[ci].inputs[ii];
811 std::ostringstream str;
812 str << "No matching output found for "
813 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
814
815 for (auto& output : constOutputs) {
816 str << "-" << DataSpecUtils::describe(output) << "\n";
817 }
818
819 throw std::runtime_error(str.str());
820 };
821
822 // This is the outer loop
823 //
824 // Here we iterate over dataprocessor items in workflow and we consider them
825 // as consumer, since we are interested in their inputs.
826 // Notice also we need to search for all the matching inputs, since
827 // we could have more than one source that matches (e.g. in the
828 // case of a time merger).
829 // Once consumed, an output is not actually used anymore, however
830 // we append it as a forward.
831 // Finally, If a device has n-way pipelining, we need to create one node per
832 // parallel pipeline and add an edge for each.
833 enumerateAvailableOutputs();
834
835 std::vector<bool> matches(constOutputs.size());
836 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
837 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
838 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
839 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
840 forwards.clear();
841 for (size_t i = 0; i < constOutputs.size(); i++) {
842 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
843 if (matches[i]) {
844 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
845 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
846 DataSpecUtils::describe(constOutputs[i]).c_str());
847 }
848 }
849
850 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
851 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
852 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
853 continue;
854 }
855 auto* oif = &availableOutputsInfo[i];
856 if (oif->forward) {
857 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
858 }
859 auto producer = oif->specIndex;
860 auto uniqueOutputId = oif->outputGlobalIndex;
861 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
862 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
863 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
864 workflow[producer].name.c_str());
865 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
866 }
867 }
868 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
869 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
870 oif->enabled = false;
871 }
872 if (forwards.empty()) {
873 errorDueToMissingOutputFor(consumer, input);
874 }
875 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto const& info) { return info.enabled == false; }), availableOutputsInfo.end());
876 std::ranges::copy(forwards, std::back_inserter(availableOutputsInfo));
877 }
878 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
879 }
880}
881
882std::vector<EdgeAction>
884 const std::vector<DeviceConnectionEdge>& edges,
885 const std::vector<size_t>& index)
886{
887 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
888
889 assert(edges.size() == index.size());
890 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
891 for (size_t i : index) {
892 auto& edge = edges[i];
893 auto& action = actions[i];
894 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
895 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
896 last = edge;
897 }
898 return actions;
899}
900
901std::vector<EdgeAction>
903 const std::vector<DeviceConnectionEdge>& edges,
904 const std::vector<size_t>& index)
905{
906 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
907
908 assert(edges.size() == index.size());
909 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
910 for (size_t i : index) {
911 auto& edge = edges[i];
912 auto& action = actions[i];
913 // Calculate which actions need to be taken for this edge.
914 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
915 action.requiresNewChannel =
916 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
917
918 last = edge;
919 }
920 return actions;
921}
922
923void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
924 std::vector<size_t>& outEdgeIndex,
925 const std::vector<DeviceConnectionEdge>& edges)
926{
927 inEdgeIndex.resize(edges.size());
928 outEdgeIndex.resize(edges.size());
929 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
930 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
931
932 // Two indexes, one to bind the outputs, the other
933 // one to connect the inputs. The
934 auto outSorter = [&edges](size_t i, size_t j) {
935 auto& a = edges[i];
936 auto& b = edges[j];
937 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
938 };
939 auto inSorter = [&edges](size_t i, size_t j) {
940 auto& a = edges[i];
941 auto& b = edges[j];
942 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
943 };
944
945 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
946 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
947}
948
950{
951 if (workflow.empty()) {
953 }
954 std::set<std::string> validNames;
955 // std::vector<OutputSpec> availableOutputs;
956 // std::vector<InputSpec> requiredInputs;
957
958 // An index many to one index to go from a given input to the
959 // associated spec
960 // std::map<size_t, size_t> inputToSpec;
961 // A one to one index to go from a given output to the Spec emitting it
962 // std::map<size_t, size_t> outputToSpec;
963
964 std::ostringstream ss;
965
966 for (auto& spec : workflow) {
967 if (spec.name.empty()) {
968 throw std::runtime_error("Invalid DataProcessorSpec name");
969 }
970 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
971 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
972 }
973 if (validNames.find(spec.name) != validNames.end()) {
974 throw std::runtime_error("Name " + spec.name + " is used twice.");
975 }
976 validNames.insert(spec.name);
977 for (auto& option : spec.options) {
978 if (option.defaultValue.type() != VariantType::Empty &&
979 option.type != option.defaultValue.type()) {
980 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
981 << ") for " << option.name << " in DataProcessorSpec of "
982 << spec.name;
983 throw std::runtime_error(ss.str());
984 }
985 }
986 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
987 InputSpec const& input = spec.inputs[ii];
988 if (DataSpecUtils::validate(input) == false) {
989 ss << "In spec " << spec.name << " input specification "
990 << ii << " requires binding, description and origin"
991 " to be fully specified (found "
992 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
993 throw std::runtime_error(ss.str());
994 }
995 }
996 }
998}
999
1000using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
1003 size_t id;
1004};
1005
1006std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
1007{
1008 // compute total number of input/output
1009 size_t totalInputs = 0;
1010 size_t totalOutputs = 0;
1011 for (auto& spec : workflow) {
1012 totalInputs += spec.inputs.size();
1013 totalOutputs += spec.outputs.size();
1014 }
1015
1016 std::vector<DataMatcherId> inputs;
1017 std::vector<DataMatcherId> outputs;
1018 inputs.reserve(totalInputs);
1019 outputs.reserve(totalOutputs);
1020
1021 std::vector<InputSpec> results;
1022 std::vector<bool> isDangling;
1023 results.reserve(totalOutputs);
1024 isDangling.reserve(totalOutputs);
1025
1027 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1028 auto& spec = workflow[wi];
1029 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1030 inputs.emplace_back(DataMatcherId{wi, ii});
1031 }
1032 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1033 outputs.emplace_back(DataMatcherId{wi, oi});
1034 }
1035 }
1036
1037 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1038 auto& output = outputs[oi];
1039 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1040
1041 // is dangling output?
1042 bool matched = false;
1043 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1044 auto& input = inputs[ii];
1045 // Inputs of the same workflow cannot match outputs
1046 if (output.workflowId == input.workflowId) {
1047 continue;
1048 }
1049 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1050 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1051 matched = true;
1052 break;
1053 }
1054 }
1055
1056 auto input = DataSpecUtils::matchingInput(outputSpec);
1057 char buf[64];
1058 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1059
1060 // make sure that entries are unique
1061 if (std::ranges::find(results, input) == results.end()) {
1062 results.emplace_back(input);
1063 isDangling.emplace_back(matched == false);
1064 }
1065 }
1066
1067 // make sure that results is unique
1068 return std::make_tuple(results, isDangling);
1069}
1070
1071std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1072{
1073
1074 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1075
1076 std::vector<InputSpec> results;
1077 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1078 if (isDangling[ii]) {
1079 results.emplace_back(outputsInputs[ii]);
1080 }
1081 }
1082
1083 return results;
1084}
1085
1086bool validateLifetime(std::ostream& errors,
1087 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1088 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1089{
1090 // In case the completion policy is consume-any, we do not need to check anything.
1091 if (consumerPolicies.completionPolicyName == "consume-any") {
1092 return true;
1093 }
1094 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1095 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1096 DataSpecUtils::describe(input).c_str(), consumer.name,
1097 DataSpecUtils::describe(output).c_str(), producer.name);
1098 return false;
1099 }
1100 return true;
1101}
1102
1103bool validateExpendable(std::ostream& errors,
1104 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1105 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1106{
1107 auto isExpendable = [](DataProcessorLabel const& label) {
1108 return label.value == "expendable";
1109 };
1110 auto isResilient = [](DataProcessorLabel const& label) {
1111 return label.value == "expendable" || label.value == "resilient";
1112 };
1113 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1114 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1115 if (producerExpendable && consumerCritical) {
1116 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1117 consumer.name,
1118 producer.name);
1119 return false;
1120 }
1121 return true;
1122}
1123
1124using Validator = std::function<bool(std::ostream& errors,
1125 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1126 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1127
1129 std::vector<DataProcessorPoliciesInfo> const& policies,
1130 std::vector<DeviceConnectionEdge> const& edges,
1131 std::vector<OutputSpec> const& outputs)
1132{
1133 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1134 std::vector<Validator> defaultValidators = {validateExpendable};
1135 if (!disableLifetimeCheck) {
1136 defaultValidators.emplace_back(validateLifetime);
1137 }
1138 std::stringstream errors;
1139 // Iterate over all the edges.
1140 // Get the input lifetime and the output lifetime.
1141 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1142 bool hasErrors = false;
1143 for (auto const& edge : edges) {
1144 DataProcessorSpec const& producer = workflow[edge.producer];
1145 DataProcessorSpec const& consumer = workflow[edge.consumer];
1146 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1147 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1148 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1149 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1150 for (auto const& validator : defaultValidators) {
1151 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1152 }
1153 }
1154 if (hasErrors) {
1155 throw std::runtime_error(errors.str());
1156 }
1157}
1158
1159} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
o2::mch::DsIndex ds
const std::string str