Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto const& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto const& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
160 DataProcessorSpec ccdbBackend{
161 .name = "internal-dpl-ccdb-backend",
162 .outputs = {},
163 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
164 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
165 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
166 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
167 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
168 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
169 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
172 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
173 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
174 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
175 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
176 };
177 DataProcessorSpec analysisCCDBBackend{
178 .name = "internal-dpl-aod-ccdb",
179 .inputs = {},
180 .outputs = {},
181 .algorithm = AlgorithmSpec::dummyAlgorithm(),
182 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
183 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
184 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
185 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
186 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
187 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
188 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
191 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
192 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
193 DataProcessorSpec transientStore{"internal-dpl-transient-store",
194 {},
195 {},
197 DataProcessorSpec qaStore{"internal-dpl-qa-store",
198 {},
199 {},
201 DataProcessorSpec timer{"internal-dpl-clock",
202 {},
203 {},
205
206 // In case InputSpec of origin AOD are
207 // requested but not available as part of the workflow,
208 // we insert in the configuration something which
209 // reads them from file.
210 //
211 // FIXME: source branch is DataOrigin, for the moment. We should
212 // make it configurable via ConfigParamsOptions
213 auto aodLifetime = Lifetime::Enumeration;
214
215 DataProcessorSpec aodReader{
216 .name = "internal-dpl-aod-reader",
217 .inputs = {InputSpec{"enumeration",
218 "DPL",
219 "ENUM",
220 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
221 aodLifetime}},
222 .algorithm = AlgorithmSpec::dummyAlgorithm(),
223 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
224 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
225 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
226 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
227 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
228 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
229 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
230 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
231 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
232 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
233
234 ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
235 auto& dec = ctx.services().get<DanglingEdgesContext>();
236
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
239
240 for (size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
243 uint32_t hash = runtime_hash(name.c_str());
244 dec.outTskMap.push_back({hash, name});
245
246 std::string prefix = "internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
248 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
249 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
250 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
251 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
252 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
253 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
254 }
255 bool hasTimeframeInputs = std::ranges::any_of(processor.inputs, [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::ranges::any_of(processor.outputs, [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
257
258 // A timeframeSink consumes timeframes without creating new
259 // timeframe data.
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
263 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
264 bool hasMatch = false;
265 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
266 auto summaryOutput = std::ranges::find_if(processor.outputs, [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
267 if (summaryOutput != processor.outputs.end()) {
268 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
269 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
270 hasMatch = true;
271 }
272
273 if (!hasMatch) {
274 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
275 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
276 }
277 }
278 }
279 bool hasConditionOption = false;
280 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
281 auto& input = processor.inputs[ii];
282 bool hasProjectors = false;
283 bool hasIndexRecords = false;
284 bool hasCCDBURLs = false;
285 // all three options are exclusive
286 for (auto const& p : input.metadata) {
287 if (p.name.compare("projectors") == 0) {
288 hasProjectors = true;
289 break;
290 }
291 if (p.name.compare("index-records") == 0) {
292 hasIndexRecords = true;
293 break;
294 }
295 if (p.name.starts_with("ccdb:")) {
296 hasCCDBURLs = true;
297 break;
298 }
299 }
300 switch (input.lifetime) {
301 case Lifetime::Timer: {
302 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
303 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "period-" + input.binding); });
304 if (hasOption == false) {
305 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
306 }
307 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
308 } break;
309 case Lifetime::Signal: {
310 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
311 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
312 } break;
313 case Lifetime::Enumeration: {
314 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
315 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
316 } break;
317 case Lifetime::Condition: {
318 requestedCCDBs.emplace_back(input);
319 if ((hasConditionOption == false) && std::ranges::none_of(processor.options, [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
320 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
321 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
322 hasConditionOption = true;
323 }
324 } break;
325 case Lifetime::OutOfBand: {
326 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
327 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
328 if (hasOption == false) {
329 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
330 }
331 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
332 } break;
333 case Lifetime::QA:
334 case Lifetime::Transient:
335 case Lifetime::Timeframe:
336 case Lifetime::Optional:
337 break;
338 }
339 if (hasProjectors) {
340 DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input});
341 } else if (hasIndexRecords) {
342 DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input});
343 } else if (hasCCDBURLs) {
344 DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input});
345 } else if (DataSpecUtils::partialMatch(input, AODOrigins)) {
346 DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input});
347 }
348 }
349
350 std::ranges::stable_sort(timer.outputs, [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
351
352 for (auto& output : processor.outputs) {
353 bool hasProjectors = false;
354 bool hasIndexRecords = false;
355 bool hasCCDBURLs = false;
356 // all three options are exclusive
357 for (auto const& p : output.metadata) {
358 if (p.name.compare("projectors") == 0) {
359 hasProjectors = true;
360 break;
361 }
362 if (p.name.compare("index-records") == 0) {
363 hasIndexRecords = true;
364 break;
365 }
366 if (p.name.starts_with("ccdb:")) {
367 hasCCDBURLs = true;
368 break;
369 }
370 }
371 if (hasProjectors) {
372 dec.providedDYNs.emplace_back(output);
373 } else if (hasCCDBURLs) {
374 dec.providedTIMs.emplace_back(output);
375 } else if (hasIndexRecords) {
376 dec.providedIDXs.emplace_back(output);
377 } else if (DataSpecUtils::partialMatch(output, AODOrigins)) {
378 dec.providedAODs.emplace_back(output);
380 dec.providedOutputObjHist.emplace_back(output);
381 auto it = std::ranges::find_if(dec.outObjHistMap, [&](auto&& x) { return x.id == hash; });
382 if (it == dec.outObjHistMap.end()) {
383 dec.outObjHistMap.push_back({hash, {output.binding.value}});
384 } else {
385 it->bindings.push_back(output.binding.value);
386 }
387 }
388
389 if (output.lifetime == Lifetime::Condition) {
390 providedCCDBs.push_back(output);
391 }
392 }
393 }
394
395 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
396 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
397
398 DataProcessorSpec indexBuilder{
399 "internal-dpl-aod-index-builder",
400 {},
401 {},
402 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
403 {}};
404 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
405 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
406 dec.requestedIDXs | views::filter_not_matching(dec.providedIDXs) | sinks::append_to{dec.builderInputs};
407 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
408
409 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
410 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
411 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
412 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, analysisCCDBBackend);
413
414 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
415 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
416 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
417
418 DataProcessorSpec aodSpawner{
419 "internal-dpl-aod-spawner",
420 {},
421 {},
422 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
423 {}};
424 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
425
426 std::ranges::sort(dec.requestedAODs, inputSpecLessThan);
427 std::ranges::sort(dec.providedAODs, outputSpecLessThan);
428 AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
429
430 std::ranges::sort(requestedCCDBs, inputSpecLessThan);
431 std::ranges::sort(providedCCDBs, outputSpecLessThan);
432 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
433
434 std::vector<DataProcessorSpec> extraSpecs;
435
436 if (transientStore.outputs.empty() == false) {
437 extraSpecs.push_back(transientStore);
438 }
439 if (qaStore.outputs.empty() == false) {
440 extraSpecs.push_back(qaStore);
441 }
442
443 if (aodSpawner.outputs.empty() == false) {
444 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
445 }
446
447 if (indexBuilder.outputs.empty() == false) {
448 extraSpecs.push_back(indexBuilder);
449 }
450
451 // add the Analysys CCDB backend which reads CCDB objects using a provided table
453 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
454 if (analysisCCDBBackend.outputs.empty() == false) {
455 extraSpecs.push_back(analysisCCDBBackend);
456 }
457 }
458
459 // add the reader
460 if (aodReader.outputs.empty() == false) {
461 auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; });
462 if (mctracks2aod == workflow.end()) {
463 // add normal reader
464 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
465 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
466 } else {
467 // AODs are being injected on-the-fly, add error-handler reader
468 aodReader.algorithm = AlgorithmSpec{
470 [](DeviceSpec const& spec) {
471 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
472 for (auto const& output : spec.outputs) {
473 LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
474 }
475 LOGP(fatal, "Stopping.");
476 // to ensure the output type for adaptStateful
477 return adaptStateless([](DataAllocator&) {});
478 })};
479 }
480 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
481 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
482 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
483 }
484
485 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
486 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
487 // Check if any of the provided outputs is a DISTSTF
488 // Check if any of the requested inputs is for a 0xccdb message
489 bool providesDISTSTF = std::ranges::any_of(workflow,
490 [&matcher](auto const& dp) {
491 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
492 return DataSpecUtils::match(matcher, output);
493 });
494 });
495
496 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
497 // we add to the first data processor which has no inputs (apart from
498 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
499 bool requiresDISTSUBTIMEFRAME = std::ranges::any_of(workflow,
500 [&dstf](auto const& dp) {
501 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
502 return DataSpecUtils::match(input, dstf);
503 });
504 });
505
506 // We find the first device which has either just enumerations or
507 // just timers, and we will add the DISTSUBTIMEFRAME to it.
508 // Notice how we do so in a stable manner by sorting the devices
509 // by name.
510 int enumCandidate = -1;
511 int timerCandidate = -1;
512 for (auto wi = 0U; wi < workflow.size(); ++wi) {
513 auto& dp = workflow[wi];
514 if (dp.inputs.size() != 1) {
515 continue;
516 }
517 auto lifetime = dp.inputs[0].lifetime;
518 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
519 enumCandidate = wi;
520 }
521 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
522 timerCandidate = wi;
523 }
524 }
525
526 // * If there are AOD outputs we use TFNumber as the CCDB clock
527 // * If one device provides a DISTSTF we use that as the CCDB clock
528 // * If one of the devices provides a timer we use that as the CCDB clock
529 // * If none of the above apply, add to the first data processor
530 // which has no inputs apart from enumerations the responsibility
531 // to provide the DISTSUBTIMEFRAME.
532 if (ccdbBackend.outputs.empty() == false) {
533 if (aodReader.outputs.empty() == false) {
534 // fetcher clock follows AOD source (TFNumber)
535 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
536 } else if (providesDISTSTF) {
537 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
538 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
539 } else {
540 if (enumCandidate != -1) {
541 // add DSTF/ccdb source to the enumeration-driven source explicitly
542 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
543 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
544 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
545 } else if (timerCandidate != -1) {
546 // fetcher clock is proived by timer source
547 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
548 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
549 }
550 }
551
552 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
553 // Load the CCDB backend from the plugin
554 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
555 extraSpecs.push_back(timePipeline(ccdbBackend, ctx.options().get<int64_t>("ccdb-fetchers")));
556 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
557 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
558 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
559 }
560
561 // add the timer
562 if (timer.outputs.empty() == false) {
563 extraSpecs.push_back(timer);
564 }
565
566 // This is to inject a file sink so that any dangling ATSK object is written
567 // to a ROOT file.
568 if (dec.providedOutputObjHist.empty() == false) {
570 extraSpecs.push_back(rootSink);
571 }
572
573 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
574 extraSpecs.clear();
575
576 injectAODWriter(workflow, ctx);
577
578 // Select dangling outputs which are not of type AOD
579 std::vector<InputSpec> redirectedOutputsInputs;
580 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
581 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
582 continue;
583 }
584 // We forward to the output proxy all the inputs only if they are dangling
585 // or if the forwarding policy is "proxy".
586 if (!dec.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
587 continue;
588 }
589 // AODs are skipped in any case.
590 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
591 continue;
592 }
593 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
594 }
595
596 std::vector<InputSpec> unmatched;
597 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
598 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
599 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
600 if (unmatched.size() != redirectedOutputsInputs.size()) {
601 extraSpecs.push_back(fileSink);
602 }
603 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
604 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
605 extraSpecs.push_back(fairMQSink);
606 } else if (forwardingDestination != "drop") {
607 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
608 }
609 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
610 std::vector<InputSpec> ignored = unmatched;
611 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
612 for (auto& ignoredInput : ignored) {
613 ignoredInput.lifetime = Lifetime::Sporadic;
614 }
615
616 // Use the new dummy sink when the AOD reader is there
617 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
618 if (aodReader.outputs.empty() == false) {
619 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
620 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
621 } else {
622 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
623 std::string rateLimitingChannelConfigOutput;
624 if (rateLimitingIPCID != -1) {
625 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
626 }
627 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
628 }
629 }
630
631 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
632 extraSpecs.clear();
633}
634
636{
637 unsigned int distSTFCount = 0;
638 for (auto& spec : workflow) {
639 auto& inputs = spec.inputs;
640 bool allSporadic = true;
641 bool hasTimer = false;
642 bool hasSporadic = false;
643 bool hasOptionals = false;
644 for (auto& input : inputs) {
645 if (input.lifetime == Lifetime::Optional) {
646 hasOptionals = true;
647 }
648 }
649 for (auto& input : inputs) {
650 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
651 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
652 // have Optional inputs as well.
653 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
654 // forwarded before actual RAWDATA arrives.
655 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
656 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
657 LOGP(error,
658 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
659 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
660 DataSpecUtils::describe(input), input.binding);
661 }
662 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
663 // The first one remains unchanged, therefore we use the postincrement
664 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
665 continue;
666 }
667 // Timers are sporadic only when they are not
668 // alone.
669 if (input.lifetime == Lifetime::Timer) {
670 hasTimer = true;
671 continue;
672 }
673 if (input.lifetime == Lifetime::Sporadic) {
674 hasSporadic = true;
675 } else {
676 allSporadic = false;
677 }
678 }
679
680 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
681
682 // If they are not all sporadic (excluding timers)
683 // we leave things as they are.
684 if (allSporadic == false) {
685 continue;
686 }
687 // A timer alone is not sporadic.
688 if (hasSporadic == false) {
689 continue;
690 }
694 for (auto& output : spec.outputs) {
695 if (output.lifetime == Lifetime::Timeframe) {
696 output.lifetime = Lifetime::Sporadic;
697 }
698 }
699 }
700
701 if (distSTFCount > 0) {
702 for (auto& spec : workflow) {
703 if (std::ranges::any_of(spec.outputs, [](auto const& output) { return DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0}); })) {
704 for (unsigned int i = 1; i < distSTFCount; ++i) {
705 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
706 }
707 break;
708 }
709 }
710 }
711}
712
714{
715 auto& dec = ctx.services().get<DanglingEdgesContext>();
717 std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
718
719 // create DataOutputDescriptor
720 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
721
722 // select outputs of type AOD which need to be saved
723 dec.outputsInputsAOD.clear();
724 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
725 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
726 auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
727 if (ds.size() > 0 || dec.isDangling[ii]) {
728 dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
729 }
730 }
731 }
732
733 // file sink for any AOD output
734 if (dec.outputsInputsAOD.size() > 0) {
735 // add TFNumber and TFFilename as input to the writer
736 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
737 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
738 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
739 workflow.push_back(fileSink);
740
741 auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
742 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
743 });
744 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
745 }
746}
747
749 std::vector<DeviceConnectionEdge>& logicalEdges,
750 std::vector<OutputSpec>& outputs,
751 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
752{
753 // In case the workflow is empty, we do not have anything to do.
754 if (workflow.empty()) {
755 return;
756 }
757
758 // This is the state. Oif is the iterator I use for the searches.
759 std::vector<LogicalOutputInfo> availableOutputsInfo;
760 auto const& constOutputs = outputs; // const version of the outputs
761 // Forwards is a local cache to avoid adding forwards before time.
762 std::vector<LogicalOutputInfo> forwards;
763
764 // Notice that availableOutputsInfo MUST be updated first, since it relies on
765 // the size of outputs to be the one before the update.
766 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
767 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
768 for (size_t wi = 0; wi < workflow.size(); ++wi) {
769 auto& producer = workflow[wi];
770 if (producer.outputs.empty()) {
771 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
772 }
773 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
774
775 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
776 auto& out = producer.outputs[oi];
777 auto uniqueOutputId = outputs.size();
778 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
779 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
780 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
781 outputs.push_back(out);
782 }
783 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
784 }
785 };
786
787 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
788 auto input = workflow[ci].inputs[ii];
789 std::ostringstream str;
790 str << "No matching output found for "
791 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
792
793 for (auto& output : constOutputs) {
794 str << "-" << DataSpecUtils::describe(output) << "\n";
795 }
796
797 throw std::runtime_error(str.str());
798 };
799
800 // This is the outer loop
801 //
802 // Here we iterate over dataprocessor items in workflow and we consider them
803 // as consumer, since we are interested in their inputs.
804 // Notice also we need to search for all the matching inputs, since
805 // we could have more than one source that matches (e.g. in the
806 // case of a time merger).
807 // Once consumed, an output is not actually used anymore, however
808 // we append it as a forward.
809 // Finally, If a device has n-way pipelining, we need to create one node per
810 // parallel pipeline and add an edge for each.
811 enumerateAvailableOutputs();
812
813 std::vector<bool> matches(constOutputs.size());
814 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
815 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
816 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
817 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
818 forwards.clear();
819 for (size_t i = 0; i < constOutputs.size(); i++) {
820 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
821 if (matches[i]) {
822 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
823 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
824 DataSpecUtils::describe(constOutputs[i]).c_str());
825 }
826 }
827
828 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
829 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
830 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
831 continue;
832 }
833 auto* oif = &availableOutputsInfo[i];
834 if (oif->forward) {
835 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
836 }
837 auto producer = oif->specIndex;
838 auto uniqueOutputId = oif->outputGlobalIndex;
839 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
840 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
841 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
842 workflow[producer].name.c_str());
843 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
844 }
845 }
846 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
847 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
848 oif->enabled = false;
849 }
850 if (forwards.empty()) {
851 errorDueToMissingOutputFor(consumer, input);
852 }
853 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto const& info) { return info.enabled == false; }), availableOutputsInfo.end());
854 std::ranges::copy(forwards, std::back_inserter(availableOutputsInfo));
855 }
856 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
857 }
858}
859
860std::vector<EdgeAction>
862 const std::vector<DeviceConnectionEdge>& edges,
863 const std::vector<size_t>& index)
864{
865 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
866
867 assert(edges.size() == index.size());
868 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
869 for (size_t i : index) {
870 auto& edge = edges[i];
871 auto& action = actions[i];
872 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
873 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
874 last = edge;
875 }
876 return actions;
877}
878
879std::vector<EdgeAction>
881 const std::vector<DeviceConnectionEdge>& edges,
882 const std::vector<size_t>& index)
883{
884 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
885
886 assert(edges.size() == index.size());
887 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
888 for (size_t i : index) {
889 auto& edge = edges[i];
890 auto& action = actions[i];
891 // Calculate which actions need to be taken for this edge.
892 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
893 action.requiresNewChannel =
894 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
895
896 last = edge;
897 }
898 return actions;
899}
900
901void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
902 std::vector<size_t>& outEdgeIndex,
903 const std::vector<DeviceConnectionEdge>& edges)
904{
905 inEdgeIndex.resize(edges.size());
906 outEdgeIndex.resize(edges.size());
907 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
908 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
909
910 // Two indexes, one to bind the outputs, the other
911 // one to connect the inputs. The
912 auto outSorter = [&edges](size_t i, size_t j) {
913 auto& a = edges[i];
914 auto& b = edges[j];
915 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
916 };
917 auto inSorter = [&edges](size_t i, size_t j) {
918 auto& a = edges[i];
919 auto& b = edges[j];
920 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
921 };
922
923 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
924 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
925}
926
928{
929 if (workflow.empty()) {
931 }
932 std::set<std::string> validNames;
933 // std::vector<OutputSpec> availableOutputs;
934 // std::vector<InputSpec> requiredInputs;
935
936 // An index many to one index to go from a given input to the
937 // associated spec
938 // std::map<size_t, size_t> inputToSpec;
939 // A one to one index to go from a given output to the Spec emitting it
940 // std::map<size_t, size_t> outputToSpec;
941
942 std::ostringstream ss;
943
944 for (auto& spec : workflow) {
945 if (spec.name.empty()) {
946 throw std::runtime_error("Invalid DataProcessorSpec name");
947 }
948 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
949 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
950 }
951 if (validNames.find(spec.name) != validNames.end()) {
952 throw std::runtime_error("Name " + spec.name + " is used twice.");
953 }
954 validNames.insert(spec.name);
955 for (auto& option : spec.options) {
956 if (option.defaultValue.type() != VariantType::Empty &&
957 option.type != option.defaultValue.type()) {
958 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
959 << ") for " << option.name << " in DataProcessorSpec of "
960 << spec.name;
961 throw std::runtime_error(ss.str());
962 }
963 }
964 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
965 InputSpec const& input = spec.inputs[ii];
966 if (DataSpecUtils::validate(input) == false) {
967 ss << "In spec " << spec.name << " input specification "
968 << ii << " requires binding, description and origin"
969 " to be fully specified (found "
970 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
971 throw std::runtime_error(ss.str());
972 }
973 }
974 }
976}
977
978using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
981 size_t id;
982};
983
984std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
985{
986 // compute total number of input/output
987 size_t totalInputs = 0;
988 size_t totalOutputs = 0;
989 for (auto& spec : workflow) {
990 totalInputs += spec.inputs.size();
991 totalOutputs += spec.outputs.size();
992 }
993
994 std::vector<DataMatcherId> inputs;
995 std::vector<DataMatcherId> outputs;
996 inputs.reserve(totalInputs);
997 outputs.reserve(totalOutputs);
998
999 std::vector<InputSpec> results;
1000 std::vector<bool> isDangling;
1001 results.reserve(totalOutputs);
1002 isDangling.reserve(totalOutputs);
1003
1005 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1006 auto& spec = workflow[wi];
1007 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1008 inputs.emplace_back(DataMatcherId{wi, ii});
1009 }
1010 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1011 outputs.emplace_back(DataMatcherId{wi, oi});
1012 }
1013 }
1014
1015 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1016 auto& output = outputs[oi];
1017 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1018
1019 // is dangling output?
1020 bool matched = false;
1021 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1022 auto& input = inputs[ii];
1023 // Inputs of the same workflow cannot match outputs
1024 if (output.workflowId == input.workflowId) {
1025 continue;
1026 }
1027 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1028 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1029 matched = true;
1030 break;
1031 }
1032 }
1033
1034 auto input = DataSpecUtils::matchingInput(outputSpec);
1035 char buf[64];
1036 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1037
1038 // make sure that entries are unique
1039 if (std::ranges::find(results, input) == results.end()) {
1040 results.emplace_back(input);
1041 isDangling.emplace_back(matched == false);
1042 }
1043 }
1044
1045 // make sure that results is unique
1046 return std::make_tuple(results, isDangling);
1047}
1048
1049std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1050{
1051
1052 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1053
1054 std::vector<InputSpec> results;
1055 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1056 if (isDangling[ii]) {
1057 results.emplace_back(outputsInputs[ii]);
1058 }
1059 }
1060
1061 return results;
1062}
1063
1064bool validateLifetime(std::ostream& errors,
1065 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1066 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1067{
1068 // In case the completion policy is consume-any, we do not need to check anything.
1069 if (consumerPolicies.completionPolicyName == "consume-any") {
1070 return true;
1071 }
1072 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1073 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1074 DataSpecUtils::describe(input).c_str(), consumer.name,
1075 DataSpecUtils::describe(output).c_str(), producer.name);
1076 return false;
1077 }
1078 return true;
1079}
1080
1081bool validateExpendable(std::ostream& errors,
1082 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1083 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1084{
1085 auto isExpendable = [](DataProcessorLabel const& label) {
1086 return label.value == "expendable";
1087 };
1088 auto isResilient = [](DataProcessorLabel const& label) {
1089 return label.value == "expendable" || label.value == "resilient";
1090 };
1091 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1092 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1093 if (producerExpendable && consumerCritical) {
1094 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1095 consumer.name,
1096 producer.name);
1097 return false;
1098 }
1099 return true;
1100}
1101
1102using Validator = std::function<bool(std::ostream& errors,
1103 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1104 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1105
1107 std::vector<DataProcessorPoliciesInfo> const& policies,
1108 std::vector<DeviceConnectionEdge> const& edges,
1109 std::vector<OutputSpec> const& outputs)
1110{
1111 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1112 std::vector<Validator> defaultValidators = {validateExpendable};
1113 if (!disableLifetimeCheck) {
1114 defaultValidators.emplace_back(validateLifetime);
1115 }
1116 std::stringstream errors;
1117 // Iterate over all the edges.
1118 // Get the input lifetime and the output lifetime.
1119 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1120 bool hasErrors = false;
1121 for (auto const& edge : edges) {
1122 DataProcessorSpec const& producer = workflow[edge.producer];
1123 DataProcessorSpec const& consumer = workflow[edge.consumer];
1124 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1125 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1126 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1127 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1128 for (auto const& validator : defaultValidators) {
1129 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1130 }
1131 }
1132 if (hasErrors) {
1133 throw std::runtime_error(errors.str());
1134 }
1135}
1136
1137} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
o2::mch::DsIndex ds
const std::string str