Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto const& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto const& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
160 DataProcessorSpec ccdbBackend{
161 .name = "internal-dpl-ccdb-backend",
162 .outputs = {},
163 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
164 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
165 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
166 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
167 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
168 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
169 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
172 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
173 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
174 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
175 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
176 };
177 DataProcessorSpec analysisCCDBBackend{
178 .name = "internal-dpl-aod-ccdb",
179 .inputs = {},
180 .outputs = {},
181 .algorithm = AlgorithmSpec::dummyAlgorithm(),
182 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
183 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
184 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
185 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
186 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
187 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
188 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
191 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
192 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
193 DataProcessorSpec transientStore{"internal-dpl-transient-store",
194 {},
195 {},
197 DataProcessorSpec qaStore{"internal-dpl-qa-store",
198 {},
199 {},
201 DataProcessorSpec timer{"internal-dpl-clock",
202 {},
203 {},
205
206 // In case InputSpec of origin AOD are
207 // requested but not available as part of the workflow,
208 // we insert in the configuration something which
209 // reads them from file.
210 //
211 // FIXME: source branch is DataOrigin, for the moment. We should
212 // make it configurable via ConfigParamsOptions
213 auto aodLifetime = Lifetime::Enumeration;
214
215 DataProcessorSpec aodReader{
216 .name = "internal-dpl-aod-reader",
217 .inputs = {InputSpec{"enumeration",
218 "DPL",
219 "ENUM",
220 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
221 aodLifetime}},
222 .algorithm = AlgorithmSpec::dummyAlgorithm(),
223 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
224 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
225 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
226 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
227 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
228 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
229 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
230 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
231 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
232 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
233
234 ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
235 auto& dec = ctx.services().get<DanglingEdgesContext>();
236
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
239
240 for (size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
243 uint32_t hash = runtime_hash(name.c_str());
244 dec.outTskMap.push_back({hash, name});
245
246 std::string prefix = "internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
248 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
249 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
250 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
251 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
252 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
253 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
254 }
255 bool hasTimeframeInputs = std::ranges::any_of(processor.inputs, [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::ranges::any_of(processor.outputs, [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
257
258 // A timeframeSink consumes timeframes without creating new
259 // timeframe data.
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
263 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
264 bool hasMatch = false;
265 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
266 auto summaryOutput = std::ranges::find_if(processor.outputs, [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
267 if (summaryOutput != processor.outputs.end()) {
268 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
269 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
270 hasMatch = true;
271 }
272
273 if (!hasMatch) {
274 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
275 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
276 }
277 }
278 }
279 bool hasConditionOption = false;
280 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
281 auto& input = processor.inputs[ii];
282 bool hasProjectors = false;
283 bool hasIndexRecords = false;
284 bool hasCCDBURLs = false;
285 bool wasAOD = false;
286 // all three options are exclusive
287 for (auto const& p : input.metadata) {
288 if (p.name.starts_with("aod-origin-replaced")) {
289 wasAOD = true;
290 }
291 if (p.name.compare("projectors") == 0) {
292 hasProjectors = true;
293 break;
294 }
295 if (p.name.compare("index-records") == 0) {
296 hasIndexRecords = true;
297 break;
298 }
299 if (p.name.starts_with("ccdb:")) {
300 hasCCDBURLs = true;
301 break;
302 }
303 }
304 switch (input.lifetime) {
305 case Lifetime::Timer: {
306 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
307 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "period-" + input.binding); });
308 if (hasOption == false) {
309 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
310 }
311 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
312 } break;
313 case Lifetime::Signal: {
314 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
315 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
316 } break;
317 case Lifetime::Enumeration: {
318 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
319 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
320 } break;
321 case Lifetime::Condition: {
322 requestedCCDBs.emplace_back(input);
323 if ((hasConditionOption == false) && std::ranges::none_of(processor.options, [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
324 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
325 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
326 hasConditionOption = true;
327 }
328 } break;
329 case Lifetime::OutOfBand: {
330 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
331 auto hasOption = std::ranges::any_of(processor.options, [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
332 if (hasOption == false) {
333 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
334 }
335 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
336 } break;
337 case Lifetime::QA:
338 case Lifetime::Transient:
339 case Lifetime::Timeframe:
340 case Lifetime::Optional:
341 break;
342 }
343 if (hasProjectors) {
344 DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input});
345 } else if (hasIndexRecords) {
346 DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input});
347 } else if (hasCCDBURLs) {
348 DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input});
349 } else if (DataSpecUtils::partialMatch(input, AODOrigins) || wasAOD) {
350 DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input});
351 }
352 }
353
354 std::ranges::stable_sort(timer.outputs, [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
355
356 for (auto& output : processor.outputs) {
357 bool hasProjectors = false;
358 bool hasIndexRecords = false;
359 bool hasCCDBURLs = false;
360 bool wasAOD = false;
361 // all three options are exclusive
362 for (auto const& p : output.metadata) {
363 if (p.name.starts_with("aod-origin-replaced")) {
364 wasAOD = true;
365 }
366 if (p.name.compare("projectors") == 0) {
367 hasProjectors = true;
368 break;
369 }
370 if (p.name.compare("index-records") == 0) {
371 hasIndexRecords = true;
372 break;
373 }
374 if (p.name.starts_with("ccdb:")) {
375 hasCCDBURLs = true;
376 break;
377 }
378 }
379 if (hasProjectors) {
380 dec.providedDYNs.emplace_back(output);
381 } else if (hasCCDBURLs) {
382 dec.providedTIMs.emplace_back(output);
383 } else if (hasIndexRecords) {
384 dec.providedIDXs.emplace_back(output);
385 } else if (DataSpecUtils::partialMatch(output, AODOrigins) || wasAOD) {
386 dec.providedAODs.emplace_back(output);
388 dec.providedOutputObjHist.emplace_back(output);
389 auto it = std::ranges::find_if(dec.outObjHistMap, [&](auto&& x) { return x.id == hash; });
390 if (it == dec.outObjHistMap.end()) {
391 dec.outObjHistMap.push_back({hash, {output.binding.value}});
392 } else {
393 it->bindings.push_back(output.binding.value);
394 }
395 }
396
397 if (output.lifetime == Lifetime::Condition) {
398 providedCCDBs.push_back(output);
399 }
400 }
401 }
402
403 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
404 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
405
406 DataProcessorSpec indexBuilder{
407 "internal-dpl-aod-index-builder",
408 {},
409 {},
410 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
411 {}};
412 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
413 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
414 dec.requestedIDXs | views::filter_not_matching(dec.providedIDXs) | sinks::append_to{dec.builderInputs};
415 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
416
417 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
418 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
419 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
420 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, analysisCCDBBackend);
421
422 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
423 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
424 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
425
426 DataProcessorSpec aodSpawner{
427 "internal-dpl-aod-spawner",
428 {},
429 {},
430 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
431 {}};
432 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
433
434 std::ranges::sort(dec.requestedAODs, inputSpecLessThan);
435 std::ranges::sort(dec.providedAODs, outputSpecLessThan);
436 AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
437
438 std::ranges::sort(requestedCCDBs, inputSpecLessThan);
439 std::ranges::sort(providedCCDBs, outputSpecLessThan);
440 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
441
442 std::vector<DataProcessorSpec> extraSpecs;
443
444 if (transientStore.outputs.empty() == false) {
445 extraSpecs.push_back(transientStore);
446 }
447 if (qaStore.outputs.empty() == false) {
448 extraSpecs.push_back(qaStore);
449 }
450
451 if (aodSpawner.outputs.empty() == false) {
452 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
453 }
454
455 if (indexBuilder.outputs.empty() == false) {
456 extraSpecs.push_back(indexBuilder);
457 }
458
459 // add the Analysys CCDB backend which reads CCDB objects using a provided table
461 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
462 if (analysisCCDBBackend.outputs.empty() == false) {
463 extraSpecs.push_back(analysisCCDBBackend);
464 }
465 }
466
467 auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
468 return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
469 return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
470 });
471 });
472
473 // add the reader
474 if (aodReader.outputs.empty() == false) {
475 if (tfnsource == workflow.end()) {
476 // add normal reader
477 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
478 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
479 } else {
480 // AODs are being injected the tfnsource is the entry point, add error-handler reader
481 aodReader.algorithm = AlgorithmSpec{
483 [](DeviceSpec const& spec) {
484 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
485 for (auto const& output : spec.outputs) {
486 LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
487 }
488 LOGP(fatal, "Stopping.");
489 // to ensure the output type for adaptStateful
490 return adaptStateless([](DataAllocator&) {});
491 })};
492 }
493 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
494 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
495 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
496 }
497
498 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
499 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
500 // Check if any of the provided outputs is a DISTSTF
501 // Check if any of the requested inputs is for a 0xccdb message
502 bool providesDISTSTF = std::ranges::any_of(workflow,
503 [&matcher](auto const& dp) {
504 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
505 return DataSpecUtils::match(matcher, output);
506 });
507 });
508
509 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
510 // we add to the first data processor which has no inputs (apart from
511 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
512 bool requiresDISTSUBTIMEFRAME = std::ranges::any_of(workflow,
513 [&dstf](auto const& dp) {
514 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
515 return DataSpecUtils::match(input, dstf);
516 });
517 });
518
519 // We find the first device which has either just enumerations or
520 // just timers, and we will add the DISTSUBTIMEFRAME to it.
521 // Notice how we do so in a stable manner by sorting the devices
522 // by name.
523 int enumCandidate = -1;
524 int timerCandidate = -1;
525 for (auto wi = 0U; wi < workflow.size(); ++wi) {
526 auto& dp = workflow[wi];
527 if (dp.inputs.size() != 1) {
528 continue;
529 }
530 auto lifetime = dp.inputs[0].lifetime;
531 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
532 enumCandidate = wi;
533 }
534 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
535 timerCandidate = wi;
536 }
537 }
538
539 // * If there are AOD outputs we use TFNumber as the CCDB clock
540 // * If one device provides a DISTSTF we use that as the CCDB clock
541 // * If one of the devices provides a timer we use that as the CCDB clock
542 // * If none of the above apply, add to the first data processor
543 // which has no inputs apart from enumerations the responsibility
544 // to provide the DISTSUBTIMEFRAME.
545 if (ccdbBackend.outputs.empty() == false) {
546 if (aodReader.outputs.empty() == false) {
547 // fetcher clock follows AOD source (TFNumber)
548 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
549 } else if (providesDISTSTF) {
550 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
551 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
552 } else {
553 if (enumCandidate != -1) {
554 // add DSTF/ccdb source to the enumeration-driven source explicitly
555 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
556 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
557 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
558 } else if (timerCandidate != -1) {
559 // fetcher clock is proived by timer source
560 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
561 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
562 }
563 }
564
565 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
566 // Load the CCDB backend from the plugin
567 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
568 extraSpecs.push_back(timePipeline(ccdbBackend, ctx.options().get<int64_t>("ccdb-fetchers")));
569 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
570 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
571 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
572 }
573
574 // add the timer
575 if (timer.outputs.empty() == false) {
576 extraSpecs.push_back(timer);
577 }
578
579 // This is to inject a file sink so that any dangling ATSK object is written
580 // to a ROOT file.
581 if (dec.providedOutputObjHist.empty() == false) {
583 extraSpecs.push_back(rootSink);
584 }
585
586 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
587 extraSpecs.clear();
588
589 injectAODWriter(workflow, ctx);
590
591 // Select dangling outputs which are not of type AOD
592 std::vector<InputSpec> redirectedOutputsInputs;
593 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
594 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
595 continue;
596 }
597 // We forward to the output proxy all the inputs only if they are dangling
598 // or if the forwarding policy is "proxy".
599 if (!dec.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
600 continue;
601 }
602 // AODs are skipped in any case.
603 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
604 continue;
605 }
606 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
607 }
608
609 std::vector<InputSpec> unmatched;
610 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
611 // update tfnsource iterator (could be aod-reader)
612 tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
613 return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
614 return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
615 });
616 });
617 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
618 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
619 if (unmatched.size() != redirectedOutputsInputs.size()) {
620 extraSpecs.push_back(fileSink);
621 }
622 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
623 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
624 extraSpecs.push_back(fairMQSink);
625 } else if (forwardingDestination != "drop") {
626 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
627 }
628 if ((unmatched.size() > 0) || (redirectedOutputsInputs.size() > 0) || (tfnsource != workflow.end())) {
629 std::vector<InputSpec> ignored = unmatched;
630 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
631 for (auto& ignoredInput : ignored) {
632 ignoredInput.lifetime = Lifetime::Sporadic;
633 }
634
635 // Use the new dummy sink when the AOD reader is there
636 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
637 if (tfnsource != workflow.end()) {
638 DataSpecUtils::updateInputList(ignored, InputSpec{"tfn", "TFN", "TFNumber", 0, Lifetime::Sporadic});
639 DataSpecUtils::updateInputList(ignored, InputSpec{"tff", "TFF", "TFFilename", 0, Lifetime::Sporadic});
640 }
641
642 if (tfnsource != workflow.end() && !tfnsource->name.starts_with("aod-producer-workflow")) { // any tfnsource except the aod-producer should use scheduled sink
643 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
644 // if there is a tfnsource, make sure the sink gets TFN/TFF
645 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
646 } else { // if there is no tfn source or if that source is aod-producer-workflow, out-of-band channel is used to propagate the number of consumed timeframes
647 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
648 std::string rateLimitingChannelConfigOutput;
649 if (rateLimitingIPCID != -1) {
650 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
651 }
652 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
653 }
654 }
655
656 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
657 extraSpecs.clear();
658}
659
661{
662 unsigned int distSTFCount = 0;
663 for (auto& spec : workflow) {
664 auto& inputs = spec.inputs;
665 bool allSporadic = true;
666 bool hasTimer = false;
667 bool hasSporadic = false;
668 bool hasOptionals = false;
669 for (auto& input : inputs) {
670 if (input.lifetime == Lifetime::Optional) {
671 hasOptionals = true;
672 }
673 }
674 for (auto& input : inputs) {
675 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
676 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
677 // have Optional inputs as well.
678 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
679 // forwarded before actual RAWDATA arrives.
680 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
681 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
682 LOGP(error,
683 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
684 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
685 DataSpecUtils::describe(input), input.binding);
686 }
687 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
688 // The first one remains unchanged, therefore we use the postincrement
689 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
690 continue;
691 }
692 // Timers are sporadic only when they are not
693 // alone.
694 if (input.lifetime == Lifetime::Timer) {
695 hasTimer = true;
696 continue;
697 }
698 if (input.lifetime == Lifetime::Sporadic) {
699 hasSporadic = true;
700 } else {
701 allSporadic = false;
702 }
703 }
704
705 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
706
707 // If they are not all sporadic (excluding timers)
708 // we leave things as they are.
709 if (allSporadic == false) {
710 continue;
711 }
712 // A timer alone is not sporadic.
713 if (hasSporadic == false) {
714 continue;
715 }
719 for (auto& output : spec.outputs) {
720 if (output.lifetime == Lifetime::Timeframe) {
721 output.lifetime = Lifetime::Sporadic;
722 }
723 }
724 }
725
726 if (distSTFCount > 0) {
727 for (auto& spec : workflow) {
728 if (std::ranges::any_of(spec.outputs, [](auto const& output) { return DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0}); })) {
729 for (unsigned int i = 1; i < distSTFCount; ++i) {
730 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
731 }
732 break;
733 }
734 }
735 }
736}
737
739{
740 auto& dec = ctx.services().get<DanglingEdgesContext>();
742 std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
743
744 // create DataOutputDescriptor
745 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
746
747 // select outputs of type AOD which need to be saved
748 dec.outputsInputsAOD.clear();
749 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
750 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], AODOrigins)) {
751 auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
752 if (ds.size() > 0 || dec.isDangling[ii]) {
753 dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
754 }
755 }
756 }
757
758 // file sink for any AOD output
759 if (dec.outputsInputsAOD.size() > 0) {
760 // add TFNumber and TFFilename as input to the writer
761 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
762 DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
763 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
764 workflow.push_back(fileSink);
765
766 auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
767 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
768 });
769 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
770
771 it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
772 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF"));
773 });
774 dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
775 }
776}
777
779 std::vector<DeviceConnectionEdge>& logicalEdges,
780 std::vector<OutputSpec>& outputs,
781 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
782{
783 // In case the workflow is empty, we do not have anything to do.
784 if (workflow.empty()) {
785 return;
786 }
787
788 // This is the state. Oif is the iterator I use for the searches.
789 std::vector<LogicalOutputInfo> availableOutputsInfo;
790 auto const& constOutputs = outputs; // const version of the outputs
791 // Forwards is a local cache to avoid adding forwards before time.
792 std::vector<LogicalOutputInfo> forwards;
793
794 // Notice that availableOutputsInfo MUST be updated first, since it relies on
795 // the size of outputs to be the one before the update.
796 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
797 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
798 for (size_t wi = 0; wi < workflow.size(); ++wi) {
799 auto& producer = workflow[wi];
800 if (producer.outputs.empty()) {
801 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
802 }
803 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
804
805 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
806 auto& out = producer.outputs[oi];
807 auto uniqueOutputId = outputs.size();
808 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
809 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
810 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
811 outputs.push_back(out);
812 }
813 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
814 }
815 };
816
817 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
818 auto input = workflow[ci].inputs[ii];
819 std::ostringstream str;
820 str << "No matching output found for "
821 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
822
823 for (auto& output : constOutputs) {
824 str << "-" << DataSpecUtils::describe(output) << "\n";
825 }
826
827 throw std::runtime_error(str.str());
828 };
829
830 // This is the outer loop
831 //
832 // Here we iterate over dataprocessor items in workflow and we consider them
833 // as consumer, since we are interested in their inputs.
834 // Notice also we need to search for all the matching inputs, since
835 // we could have more than one source that matches (e.g. in the
836 // case of a time merger).
837 // Once consumed, an output is not actually used anymore, however
838 // we append it as a forward.
839 // Finally, If a device has n-way pipelining, we need to create one node per
840 // parallel pipeline and add an edge for each.
841 enumerateAvailableOutputs();
842
843 std::vector<bool> matches(constOutputs.size());
844 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
845 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
846 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
847 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
848 forwards.clear();
849 for (size_t i = 0; i < constOutputs.size(); i++) {
850 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
851 if (matches[i]) {
852 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
853 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
854 DataSpecUtils::describe(constOutputs[i]).c_str());
855 }
856 }
857
858 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
859 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
860 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
861 continue;
862 }
863 auto* oif = &availableOutputsInfo[i];
864 if (oif->forward) {
865 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
866 }
867 auto producer = oif->specIndex;
868 auto uniqueOutputId = oif->outputGlobalIndex;
869 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
870 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
871 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
872 workflow[producer].name.c_str());
873 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
874 }
875 }
876 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
877 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
878 oif->enabled = false;
879 }
880 if (forwards.empty()) {
881 errorDueToMissingOutputFor(consumer, input);
882 }
883 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto const& info) { return info.enabled == false; }), availableOutputsInfo.end());
884 std::ranges::copy(forwards, std::back_inserter(availableOutputsInfo));
885 }
886 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
887 }
888}
889
890std::vector<EdgeAction>
892 const std::vector<DeviceConnectionEdge>& edges,
893 const std::vector<size_t>& index)
894{
895 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
896
897 assert(edges.size() == index.size());
898 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
899 for (size_t i : index) {
900 auto& edge = edges[i];
901 auto& action = actions[i];
902 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
903 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
904 last = edge;
905 }
906 return actions;
907}
908
909std::vector<EdgeAction>
911 const std::vector<DeviceConnectionEdge>& edges,
912 const std::vector<size_t>& index)
913{
914 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
915
916 assert(edges.size() == index.size());
917 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
918 for (size_t i : index) {
919 auto& edge = edges[i];
920 auto& action = actions[i];
921 // Calculate which actions need to be taken for this edge.
922 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
923 action.requiresNewChannel =
924 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
925
926 last = edge;
927 }
928 return actions;
929}
930
931void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
932 std::vector<size_t>& outEdgeIndex,
933 const std::vector<DeviceConnectionEdge>& edges)
934{
935 inEdgeIndex.resize(edges.size());
936 outEdgeIndex.resize(edges.size());
937 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
938 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
939
940 // Two indexes, one to bind the outputs, the other
941 // one to connect the inputs. The
942 auto outSorter = [&edges](size_t i, size_t j) {
943 auto& a = edges[i];
944 auto& b = edges[j];
945 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
946 };
947 auto inSorter = [&edges](size_t i, size_t j) {
948 auto& a = edges[i];
949 auto& b = edges[j];
950 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
951 };
952
953 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
954 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
955}
956
958{
959 if (workflow.empty()) {
961 }
962 std::set<std::string> validNames;
963 // std::vector<OutputSpec> availableOutputs;
964 // std::vector<InputSpec> requiredInputs;
965
966 // An index many to one index to go from a given input to the
967 // associated spec
968 // std::map<size_t, size_t> inputToSpec;
969 // A one to one index to go from a given output to the Spec emitting it
970 // std::map<size_t, size_t> outputToSpec;
971
972 std::ostringstream ss;
973
974 for (auto& spec : workflow) {
975 if (spec.name.empty()) {
976 throw std::runtime_error("Invalid DataProcessorSpec name");
977 }
978 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
979 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
980 }
981 if (validNames.find(spec.name) != validNames.end()) {
982 throw std::runtime_error("Name " + spec.name + " is used twice.");
983 }
984 validNames.insert(spec.name);
985 for (auto& option : spec.options) {
986 if (option.defaultValue.type() != VariantType::Empty &&
987 option.type != option.defaultValue.type()) {
988 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
989 << ") for " << option.name << " in DataProcessorSpec of "
990 << spec.name;
991 throw std::runtime_error(ss.str());
992 }
993 }
994 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
995 InputSpec const& input = spec.inputs[ii];
996 if (DataSpecUtils::validate(input) == false) {
997 ss << "In spec " << spec.name << " input specification "
998 << ii << " requires binding, description and origin"
999 " to be fully specified (found "
1000 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
1001 throw std::runtime_error(ss.str());
1002 }
1003 }
1004 }
1006}
1007
1008using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
1011 size_t id;
1012};
1013
1014std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
1015{
1016 // compute total number of input/output
1017 size_t totalInputs = 0;
1018 size_t totalOutputs = 0;
1019 for (auto& spec : workflow) {
1020 totalInputs += spec.inputs.size();
1021 totalOutputs += spec.outputs.size();
1022 }
1023
1024 std::vector<DataMatcherId> inputs;
1025 std::vector<DataMatcherId> outputs;
1026 inputs.reserve(totalInputs);
1027 outputs.reserve(totalOutputs);
1028
1029 std::vector<InputSpec> results;
1030 std::vector<bool> isDangling;
1031 results.reserve(totalOutputs);
1032 isDangling.reserve(totalOutputs);
1033
1035 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1036 auto& spec = workflow[wi];
1037 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1038 inputs.emplace_back(DataMatcherId{wi, ii});
1039 }
1040 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1041 outputs.emplace_back(DataMatcherId{wi, oi});
1042 }
1043 }
1044
1045 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1046 auto& output = outputs[oi];
1047 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1048
1049 // is dangling output?
1050 bool matched = false;
1051 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1052 auto& input = inputs[ii];
1053 // Inputs of the same workflow cannot match outputs
1054 if (output.workflowId == input.workflowId) {
1055 continue;
1056 }
1057 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1058 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1059 matched = true;
1060 break;
1061 }
1062 }
1063
1064 auto input = DataSpecUtils::matchingInput(outputSpec);
1065 char buf[64];
1066 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1067
1068 // make sure that entries are unique
1069 if (std::ranges::find(results, input) == results.end()) {
1070 results.emplace_back(input);
1071 isDangling.emplace_back(matched == false);
1072 }
1073 }
1074
1075 // make sure that results is unique
1076 return std::make_tuple(results, isDangling);
1077}
1078
1079std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1080{
1081
1082 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1083
1084 std::vector<InputSpec> results;
1085 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1086 if (isDangling[ii]) {
1087 results.emplace_back(outputsInputs[ii]);
1088 }
1089 }
1090
1091 return results;
1092}
1093
1094bool validateLifetime(std::ostream& errors,
1095 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1096 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1097{
1098 // In case the completion policy is consume-any, we do not need to check anything.
1099 if (consumerPolicies.completionPolicyName == "consume-any") {
1100 return true;
1101 }
1102 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1103 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1104 DataSpecUtils::describe(input).c_str(), consumer.name,
1105 DataSpecUtils::describe(output).c_str(), producer.name);
1106 return false;
1107 }
1108 return true;
1109}
1110
1111bool validateExpendable(std::ostream& errors,
1112 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1113 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1114{
1115 auto isExpendable = [](DataProcessorLabel const& label) {
1116 return label.value == "expendable";
1117 };
1118 auto isResilient = [](DataProcessorLabel const& label) {
1119 return label.value == "expendable" || label.value == "resilient";
1120 };
1121 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1122 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1123 if (producerExpendable && consumerCritical) {
1124 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1125 consumer.name,
1126 producer.name);
1127 return false;
1128 }
1129 return true;
1130}
1131
1132using Validator = std::function<bool(std::ostream& errors,
1133 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1134 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1135
1137 std::vector<DataProcessorPoliciesInfo> const& policies,
1138 std::vector<DeviceConnectionEdge> const& edges,
1139 std::vector<OutputSpec> const& outputs)
1140{
1141 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1142 std::vector<Validator> defaultValidators = {validateExpendable};
1143 if (!disableLifetimeCheck) {
1144 defaultValidators.emplace_back(validateLifetime);
1145 }
1146 std::stringstream errors;
1147 // Iterate over all the edges.
1148 // Get the input lifetime and the output lifetime.
1149 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1150 bool hasErrors = false;
1151 for (auto const& edge : edges) {
1152 DataProcessorSpec const& producer = workflow[edge.producer];
1153 DataProcessorSpec const& consumer = workflow[edge.consumer];
1154 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1155 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1156 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1157 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1158 for (auto const& validator : defaultValidators) {
1159 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1160 }
1161 }
1162 if (hasErrors) {
1163 throw std::runtime_error(errors.str());
1164 }
1165}
1166
1167} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
o2::mch::DsIndex ds
const std::string str