Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
160 DataProcessorSpec ccdbBackend{
161 .name = "internal-dpl-ccdb-backend",
162 .outputs = {},
163 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
164 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
165 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
166 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
167 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
168 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
169 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
170 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
171 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
172 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
173 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
174 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
175 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
176 };
177 DataProcessorSpec analysisCCDBBackend{
178 .name = "internal-dpl-aod-ccdb",
179 .inputs = {},
180 .outputs = {},
181 .algorithm = AlgorithmSpec::dummyAlgorithm(),
182 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
183 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
184 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
185 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
186 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
187 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
188 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
189 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
190 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
191 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
192 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
193 DataProcessorSpec transientStore{"internal-dpl-transient-store",
194 {},
195 {},
197 DataProcessorSpec qaStore{"internal-dpl-qa-store",
198 {},
199 {},
201 DataProcessorSpec timer{"internal-dpl-clock",
202 {},
203 {},
205
206 // In case InputSpec of origin AOD are
207 // requested but not available as part of the workflow,
208 // we insert in the configuration something which
209 // reads them from file.
210 //
211 // FIXME: source branch is DataOrigin, for the moment. We should
212 // make it configurable via ConfigParamsOptions
213 auto aodLifetime = Lifetime::Enumeration;
214
215 DataProcessorSpec aodReader{
216 .name = "internal-dpl-aod-reader",
217 .inputs = {InputSpec{"enumeration",
218 "DPL",
219 "ENUM",
220 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
221 aodLifetime}},
222 .algorithm = AlgorithmSpec::dummyAlgorithm(),
223 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
224 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
225 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
226 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
227 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
228 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
229 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
230 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
231 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
232 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
233
234 ctx.services().registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
235 auto& dec = ctx.services().get<DanglingEdgesContext>();
236
237 std::vector<InputSpec> requestedCCDBs;
238 std::vector<OutputSpec> providedCCDBs;
239
240 for (size_t wi = 0; wi < workflow.size(); ++wi) {
241 auto& processor = workflow[wi];
242 auto name = processor.name;
243 auto hash = runtime_hash(name.c_str());
244 dec.outTskMap.push_back({hash, name});
245
246 std::string prefix = "internal-dpl-";
247 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
248 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
249 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
250 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
251 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
252 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
253 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
254 }
255 bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
256 bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
257
258 // A timeframeSink consumes timeframes without creating new
259 // timeframe data.
260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
261 if (rateLimitingIPCID != -1) {
262 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
263 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
264 uint32_t hash = runtime_hash(processor.name.c_str());
265 bool hasMatch = false;
266 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
267 auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
268 if (summaryOutput != processor.outputs.end()) {
269 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
270 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
271 hasMatch = true;
272 }
273
274 if (!hasMatch) {
275 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
276 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
277 }
278 }
279 }
280 bool hasConditionOption = false;
281 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
282 auto& input = processor.inputs[ii];
283 switch (input.lifetime) {
284 case Lifetime::Timer: {
285 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
286 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
287 if (hasOption == false) {
288 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
289 }
290 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
291 } break;
292 case Lifetime::Signal: {
293 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
294 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
295 } break;
296 case Lifetime::Enumeration: {
297 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
298 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
299 } break;
300 case Lifetime::Condition: {
301 requestedCCDBs.emplace_back(input);
302 if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
303 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
304 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
305 hasConditionOption = true;
306 }
307 } break;
308 case Lifetime::OutOfBand: {
309 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
310 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
311 if (hasOption == false) {
312 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
313 }
314 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
315 } break;
316 case Lifetime::QA:
317 case Lifetime::Transient:
318 case Lifetime::Timeframe:
319 case Lifetime::Optional:
320 break;
321 }
322 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
323 DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input});
324 }
326 DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input});
327 }
329 DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input});
330 }
332 DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input});
333 }
334 }
335
336 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
337
338 for (auto& output : processor.outputs) {
339 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
340 dec.providedAODs.emplace_back(output);
342 dec.providedDYNs.emplace_back(output);
344 dec.providedTIMs.emplace_back(output);
346 dec.providedOutputObjHist.emplace_back(output);
347 auto it = std::find_if(dec.outObjHistMap.begin(), dec.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
348 if (it == dec.outObjHistMap.end()) {
349 dec.outObjHistMap.push_back({hash, {output.binding.value}});
350 } else {
351 it->bindings.push_back(output.binding.value);
352 }
353 }
354 if (output.lifetime == Lifetime::Condition) {
355 providedCCDBs.push_back(output);
356 }
357 }
358 }
359
360 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
361 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
362 std::sort(dec.requestedDYNs.begin(), dec.requestedDYNs.end(), inputSpecLessThan);
363 std::sort(dec.requestedTIMs.begin(), dec.requestedTIMs.end(), inputSpecLessThan);
364 std::sort(dec.providedDYNs.begin(), dec.providedDYNs.end(), outputSpecLessThan);
365 std::sort(dec.providedTIMs.begin(), dec.providedTIMs.end(), outputSpecLessThan);
366
367 DataProcessorSpec indexBuilder{
368 "internal-dpl-aod-index-builder",
369 {},
370 {},
371 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
372 {}};
373 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, indexBuilder);
374
375 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
377 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
378 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedTIMs, analysisCCDBBackend);
379 }
380
381 dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs};
382
383 DataProcessorSpec aodSpawner{
384 "internal-dpl-aod-spawner",
385 {},
386 {},
387 AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology
388 {}};
389 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner);
390 AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader);
391
392 std::sort(requestedCCDBs.begin(), requestedCCDBs.end(), inputSpecLessThan);
393 std::sort(providedCCDBs.begin(), providedCCDBs.end(), outputSpecLessThan);
394 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
395
396 std::vector<DataProcessorSpec> extraSpecs;
397
398 if (transientStore.outputs.empty() == false) {
399 extraSpecs.push_back(transientStore);
400 }
401 if (qaStore.outputs.empty() == false) {
402 extraSpecs.push_back(qaStore);
403 }
404
405 if (aodSpawner.outputs.empty() == false) {
406 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
407 }
408
409 if (indexBuilder.outputs.empty() == false) {
410 extraSpecs.push_back(indexBuilder);
411 }
412
413 // add the reader
414 if (aodReader.outputs.empty() == false) {
415 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
416 if (mctracks2aod == workflow.end()) {
417 // add normal reader
418 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
419 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
420 } else {
421 // AODs are being injected on-the-fly, add error-handler reader
422 aodReader.algorithm = AlgorithmSpec{
424 [outputs = aodReader.outputs](DeviceSpec const&) {
425 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
426 for (auto const& output : outputs) {
427 LOGP(warn, " {}", DataSpecUtils::describe(output));
428 }
429 LOGP(fatal, "Stopping.");
430 // to ensure the output type for adaptStateful
431 return adaptStateless([](DataAllocator&) {});
432 })};
433 }
434 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
435 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
436 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
437 }
438
439 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
440 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
441 // Check if any of the provided outputs is a DISTSTF
442 // Check if any of the requested inputs is for a 0xccdb message
443 bool providesDISTSTF = std::any_of(workflow.begin(), workflow.end(),
444 [&matcher](auto const& dp) {
445 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
446 return DataSpecUtils::match(matcher, output);
447 });
448 });
449
450 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
451 // we add to the first data processor which has no inputs (apart from
452 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
453 bool requiresDISTSUBTIMEFRAME = std::any_of(workflow.begin(), workflow.end(),
454 [&dstf](auto const& dp) {
455 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
456 return DataSpecUtils::match(input, dstf);
457 });
458 });
459
460 // We find the first device which has either just enumerations or
461 // just timers, and we will add the DISTSUBTIMEFRAME to it.
462 // Notice how we do so in a stable manner by sorting the devices
463 // by name.
464 int enumCandidate = -1;
465 int timerCandidate = -1;
466 for (auto wi = 0U; wi < workflow.size(); ++wi) {
467 auto& dp = workflow[wi];
468 if (dp.inputs.size() != 1) {
469 continue;
470 }
471 auto lifetime = dp.inputs[0].lifetime;
472 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
473 enumCandidate = wi;
474 }
475 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
476 timerCandidate = wi;
477 }
478 }
479
480 // * If there are AOD outputs we use TFNumber as the CCDB clock
481 // * If one device provides a DISTSTF we use that as the CCDB clock
482 // * If one of the devices provides a timer we use that as the CCDB clock
483 // * If none of the above apply, add to the first data processor
484 // which has no inputs apart from enumerations the responsibility
485 // to provide the DISTSUBTIMEFRAME.
486 if (ccdbBackend.outputs.empty() == false) {
487 if (aodReader.outputs.empty() == false) {
488 // fetcher clock follows AOD source (TFNumber)
489 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
490 } else if (providesDISTSTF) {
491 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
492 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
493 } else {
494 if (enumCandidate != -1) {
495 // add DSTF/ccdb source to the enumeration-driven source explicitly
496 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
497 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
498 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
499 } else if (timerCandidate != -1) {
500 // fetcher clock is proived by timer source
501 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
502 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
503 }
504 }
505
506 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
507 // Load the CCDB backend from the plugin
508 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
509 extraSpecs.push_back(ccdbBackend);
510 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
511 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
512 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
513 }
514
515 // add the Analysys CCDB backend which reads CCDB objects using a provided table
516 if (analysisCCDBBackend.outputs.empty() == false) {
517 extraSpecs.push_back(analysisCCDBBackend);
518 }
519
520 // add the timer
521 if (timer.outputs.empty() == false) {
522 extraSpecs.push_back(timer);
523 }
524
525 // This is to inject a file sink so that any dangling ATSK object is written
526 // to a ROOT file.
527 if (dec.providedOutputObjHist.empty() == false) {
529 extraSpecs.push_back(rootSink);
530 }
531
532 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
533 extraSpecs.clear();
534
536 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
537 dec.isDangling = isDanglingTmp;
538 dec.outputsInputs = outputsInputsTmp;
539
540 // create DataOutputDescriptor
541 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
542
543 // select outputs of type AOD which need to be saved
544 // ATTENTION: if there are dangling outputs the getGlobalAODSink
545 // has to be created in any case!
546 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
547 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
548 auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
549 if (ds.size() > 0 || dec.isDangling[ii]) {
550 dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
551 }
552 }
553 }
554
555 // file sink for any AOD output
556 if (dec.outputsInputsAOD.size() > 0) {
557 // add TFNumber and TFFilename as input to the writer
558 dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
559 dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
560 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
561 extraSpecs.push_back(fileSink);
562
563 auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec& spec) -> bool {
564 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
565 });
566 size_t ii = std::distance(dec.outputsInputs.begin(), it);
567 dec.isDangling[ii] = false;
568 }
569
570 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
571 extraSpecs.clear();
572
573 // Select dangling outputs which are not of type AOD
574 std::vector<InputSpec> redirectedOutputsInputs;
575 for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
576 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
577 continue;
578 }
579 // We forward to the output proxy all the inputs only if they are dangling
580 // or if the forwarding policy is "proxy".
581 if (!dec.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
582 continue;
583 }
584 // AODs are skipped in any case.
585 if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
586 continue;
587 }
588 redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]);
589 }
590
591 std::vector<InputSpec> unmatched;
592 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
593 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
594 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
595 if (unmatched.size() != redirectedOutputsInputs.size()) {
596 extraSpecs.push_back(fileSink);
597 }
598 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
599 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
600 extraSpecs.push_back(fairMQSink);
601 } else if (forwardingDestination != "drop") {
602 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
603 }
604 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
605 std::vector<InputSpec> ignored = unmatched;
606 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
607 for (auto& ignoredInput : ignored) {
608 ignoredInput.lifetime = Lifetime::Sporadic;
609 }
610
611 // Use the new dummy sink when the AOD reader is there
612 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
613 if (aodReader.outputs.empty() == false) {
614 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
615 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
616 } else {
617 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
618 std::string rateLimitingChannelConfigOutput;
619 if (rateLimitingIPCID != -1) {
620 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
621 }
622 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
623 }
624 }
625
626 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
627 extraSpecs.clear();
628}
629
631{
632 unsigned int distSTFCount = 0;
633 for (auto& spec : workflow) {
634 auto& inputs = spec.inputs;
635 bool allSporadic = true;
636 bool hasTimer = false;
637 bool hasSporadic = false;
638 bool hasOptionals = false;
639 for (auto& input : inputs) {
640 if (input.lifetime == Lifetime::Optional) {
641 hasOptionals = true;
642 }
643 }
644 for (auto& input : inputs) {
645 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
646 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
647 // have Optional inputs as well.
648 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
649 // forwarded before actual RAWDATA arrives.
650 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
651 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
652 LOGP(error,
653 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
654 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
655 DataSpecUtils::describe(input), input.binding);
656 }
657 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
658 // The first one remains unchanged, therefore we use the postincrement
659 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
660 continue;
661 }
662 // Timers are sporadic only when they are not
663 // alone.
664 if (input.lifetime == Lifetime::Timer) {
665 hasTimer = true;
666 continue;
667 }
668 if (input.lifetime == Lifetime::Sporadic) {
669 hasSporadic = true;
670 } else {
671 allSporadic = false;
672 }
673 }
674
675 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
676
677 // If they are not all sporadic (excluding timers)
678 // we leave things as they are.
679 if (allSporadic == false) {
680 continue;
681 }
682 // A timer alone is not sporadic.
683 if (hasSporadic == false) {
684 continue;
685 }
689 for (auto& output : spec.outputs) {
690 if (output.lifetime == Lifetime::Timeframe) {
691 output.lifetime = Lifetime::Sporadic;
692 }
693 }
694 }
695
696 if (distSTFCount > 0) {
697 bool found = false;
698 for (auto& spec : workflow) {
699 for (auto& output : spec.outputs) {
700 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
701 found = true;
702 break;
703 }
704 }
705 if (found) {
706 for (unsigned int i = 1; i < distSTFCount; ++i) {
707 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
708 }
709 break;
710 }
711 }
712 }
713}
714
716 std::vector<DeviceConnectionEdge>& logicalEdges,
717 std::vector<OutputSpec>& outputs,
718 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
719{
720 // In case the workflow is empty, we do not have anything to do.
721 if (workflow.empty()) {
722 return;
723 }
724
725 // This is the state. Oif is the iterator I use for the searches.
726 std::vector<LogicalOutputInfo> availableOutputsInfo;
727 auto const& constOutputs = outputs; // const version of the outputs
728 // Forwards is a local cache to avoid adding forwards before time.
729 std::vector<LogicalOutputInfo> forwards;
730
731 // Notice that availableOutputsInfo MUST be updated first, since it relies on
732 // the size of outputs to be the one before the update.
733 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
734 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
735 for (size_t wi = 0; wi < workflow.size(); ++wi) {
736 auto& producer = workflow[wi];
737 if (producer.outputs.empty()) {
738 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
739 }
740 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
741
742 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
743 auto& out = producer.outputs[oi];
744 auto uniqueOutputId = outputs.size();
745 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
746 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
747 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
748 outputs.push_back(out);
749 }
750 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
751 }
752 };
753
754 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
755 auto input = workflow[ci].inputs[ii];
756 std::ostringstream str;
757 str << "No matching output found for "
758 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
759
760 for (auto& output : constOutputs) {
761 str << "-" << DataSpecUtils::describe(output) << "\n";
762 }
763
764 throw std::runtime_error(str.str());
765 };
766
767 // This is the outer loop
768 //
769 // Here we iterate over dataprocessor items in workflow and we consider them
770 // as consumer, since we are interested in their inputs.
771 // Notice also we need to search for all the matching inputs, since
772 // we could have more than one source that matches (e.g. in the
773 // case of a time merger).
774 // Once consumed, an output is not actually used anymore, however
775 // we append it as a forward.
776 // Finally, If a device has n-way pipelining, we need to create one node per
777 // parallel pipeline and add an edge for each.
778 enumerateAvailableOutputs();
779
780 std::vector<bool> matches(constOutputs.size());
781 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
782 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
783 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
784 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
785 forwards.clear();
786 for (size_t i = 0; i < constOutputs.size(); i++) {
787 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
788 if (matches[i]) {
789 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
790 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
791 DataSpecUtils::describe(constOutputs[i]).c_str());
792 }
793 }
794
795 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
796 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
797 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
798 continue;
799 }
800 auto* oif = &availableOutputsInfo[i];
801 if (oif->forward) {
802 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
803 }
804 auto producer = oif->specIndex;
805 auto uniqueOutputId = oif->outputGlobalIndex;
806 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
807 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
808 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
809 workflow[producer].name.c_str());
810 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
811 }
812 }
813 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
814 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
815 oif->enabled = false;
816 }
817 if (forwards.empty()) {
818 errorDueToMissingOutputFor(consumer, input);
819 }
820 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
821 for (auto& forward : forwards) {
822 availableOutputsInfo.push_back(forward);
823 }
824 }
825 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
826 }
827}
828
829std::vector<EdgeAction>
831 const std::vector<DeviceConnectionEdge>& edges,
832 const std::vector<size_t>& index)
833{
834 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
835
836 assert(edges.size() == index.size());
837 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
838 for (size_t i : index) {
839 auto& edge = edges[i];
840 auto& action = actions[i];
841 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
842 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
843 last = edge;
844 }
845 return actions;
846}
847
848std::vector<EdgeAction>
850 const std::vector<DeviceConnectionEdge>& edges,
851 const std::vector<size_t>& index)
852{
853 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
854
855 assert(edges.size() == index.size());
856 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
857 for (size_t i : index) {
858 auto& edge = edges[i];
859 auto& action = actions[i];
860 // Calculate which actions need to be taken for this edge.
861 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
862 action.requiresNewChannel =
863 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
864
865 last = edge;
866 }
867 return actions;
868}
869
870void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
871 std::vector<size_t>& outEdgeIndex,
872 const std::vector<DeviceConnectionEdge>& edges)
873{
874 inEdgeIndex.resize(edges.size());
875 outEdgeIndex.resize(edges.size());
876 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
877 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
878
879 // Two indexes, one to bind the outputs, the other
880 // one to connect the inputs. The
881 auto outSorter = [&edges](size_t i, size_t j) {
882 auto& a = edges[i];
883 auto& b = edges[j];
884 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
885 };
886 auto inSorter = [&edges](size_t i, size_t j) {
887 auto& a = edges[i];
888 auto& b = edges[j];
889 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
890 };
891
892 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
893 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
894}
895
897{
898 if (workflow.empty()) {
900 }
901 std::set<std::string> validNames;
902 std::vector<OutputSpec> availableOutputs;
903 std::vector<InputSpec> requiredInputs;
904
905 // An index many to one index to go from a given input to the
906 // associated spec
907 std::map<size_t, size_t> inputToSpec;
908 // A one to one index to go from a given output to the Spec emitting it
909 std::map<size_t, size_t> outputToSpec;
910
911 std::ostringstream ss;
912
913 for (auto& spec : workflow) {
914 if (spec.name.empty()) {
915 throw std::runtime_error("Invalid DataProcessorSpec name");
916 }
917 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
918 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
919 }
920 if (validNames.find(spec.name) != validNames.end()) {
921 throw std::runtime_error("Name " + spec.name + " is used twice.");
922 }
923 validNames.insert(spec.name);
924 for (auto& option : spec.options) {
925 if (option.defaultValue.type() != VariantType::Empty &&
926 option.type != option.defaultValue.type()) {
927 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
928 << ") for " << option.name << " in DataProcessorSpec of "
929 << spec.name;
930 throw std::runtime_error(ss.str());
931 }
932 }
933 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
934 InputSpec const& input = spec.inputs[ii];
935 if (DataSpecUtils::validate(input) == false) {
936 ss << "In spec " << spec.name << " input specification "
937 << ii << " requires binding, description and origin"
938 " to be fully specified (found "
939 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
940 throw std::runtime_error(ss.str());
941 }
942 }
943 }
945}
946
947using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
950 size_t id;
951};
952
953std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
954{
955 // compute total number of input/output
956 size_t totalInputs = 0;
957 size_t totalOutputs = 0;
958 for (auto& spec : workflow) {
959 totalInputs += spec.inputs.size();
960 totalOutputs += spec.outputs.size();
961 }
962
963 std::vector<DataMatcherId> inputs;
964 std::vector<DataMatcherId> outputs;
965 inputs.reserve(totalInputs);
966 outputs.reserve(totalOutputs);
967
968 std::vector<InputSpec> results;
969 std::vector<bool> isDangling;
970 results.reserve(totalOutputs);
971 isDangling.reserve(totalOutputs);
972
974 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
975 auto& spec = workflow[wi];
976 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
977 inputs.emplace_back(DataMatcherId{wi, ii});
978 }
979 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
980 outputs.emplace_back(DataMatcherId{wi, oi});
981 }
982 }
983
984 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
985 auto& output = outputs[oi];
986 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
987
988 // is dangling output?
989 bool matched = false;
990 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
991 auto& input = inputs[ii];
992 // Inputs of the same workflow cannot match outputs
993 if (output.workflowId == input.workflowId) {
994 continue;
995 }
996 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
997 if (DataSpecUtils::match(inputSpec, outputSpec)) {
998 matched = true;
999 break;
1000 }
1001 }
1002
1003 auto input = DataSpecUtils::matchingInput(outputSpec);
1004 char buf[64];
1005 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1006
1007 // make sure that entries are unique
1008 if (std::find(results.begin(), results.end(), input) == results.end()) {
1009 results.emplace_back(input);
1010 isDangling.emplace_back(matched == false);
1011 }
1012 }
1013
1014 // make sure that results is unique
1015 return std::make_tuple(results, isDangling);
1016}
1017
1018std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1019{
1020
1021 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1022
1023 std::vector<InputSpec> results;
1024 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1025 if (isDangling[ii]) {
1026 results.emplace_back(outputsInputs[ii]);
1027 }
1028 }
1029
1030 return results;
1031}
1032
1033bool validateLifetime(std::ostream& errors,
1034 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1035 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1036{
1037 // In case the completion policy is consume-any, we do not need to check anything.
1038 if (consumerPolicies.completionPolicyName == "consume-any") {
1039 return true;
1040 }
1041 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1042 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1043 DataSpecUtils::describe(input).c_str(), consumer.name,
1044 DataSpecUtils::describe(output).c_str(), producer.name);
1045 return false;
1046 }
1047 return true;
1048}
1049
1050bool validateExpendable(std::ostream& errors,
1051 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1052 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1053{
1054 auto isExpendable = [](DataProcessorLabel const& label) {
1055 return label.value == "expendable";
1056 };
1057 auto isResilient = [](DataProcessorLabel const& label) {
1058 return label.value == "expendable" || label.value == "resilient";
1059 };
1060 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1061 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1062 if (producerExpendable && consumerCritical) {
1063 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1064 consumer.name,
1065 producer.name);
1066 return false;
1067 }
1068 return true;
1069}
1070
1071using Validator = std::function<bool(std::ostream& errors,
1072 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1073 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1074
1076 std::vector<DataProcessorPoliciesInfo> const& policies,
1077 std::vector<DeviceConnectionEdge> const& edges,
1078 std::vector<OutputSpec> const& outputs)
1079{
1080 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1081 std::vector<Validator> defaultValidators = {validateExpendable};
1082 if (!disableLifetimeCheck) {
1083 defaultValidators.emplace_back(validateLifetime);
1084 }
1085 std::stringstream errors;
1086 // Iterate over all the edges.
1087 // Get the input lifetime and the output lifetime.
1088 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1089 bool hasErrors = false;
1090 for (auto& edge : edges) {
1091 DataProcessorSpec const& producer = workflow[edge.producer];
1092 DataProcessorSpec const& consumer = workflow[edge.consumer];
1093 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1094 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1095 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1096 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1097 for (auto& validator : defaultValidators) {
1098 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1099 }
1100 }
1101 if (hasErrors) {
1102 throw std::runtime_error(errors.str());
1103 }
1104}
1105
1106} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
o2::mch::DsIndex ds
const std::string str