Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
29#include "Framework/Signpost.h"
31
32#include "Framework/Variant.h"
33#include "Headers/DataHeader.h"
34#include <algorithm>
35#include <list>
36#include <set>
37#include <utility>
38#include <vector>
39#include <climits>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 auto fakeCallback = AlgorithmSpec{[](InitContext& ic) {
160 LOG(info) << "This is not a real device, merely a placeholder for external inputs";
161 LOG(info) << "To be hidden / removed at some point.";
162 // mark this dummy process as ready-to-quit
163 ic.services().get<ControlService>().readyToQuit(QuitRequest::Me);
164
165 return [](ProcessingContext& pc) {
166 // this callback is never called since there is no expiring input
167 pc.services().get<RawDeviceService>().waitFor(2000);
168 };
169 }};
170
171 DataProcessorSpec ccdbBackend{
172 .name = "internal-dpl-ccdb-backend",
173 .outputs = {},
174 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
175 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
176 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
177 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
178 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
179 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks"}},
180 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
181 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
182 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
183 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
184 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
185 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
186 };
187 DataProcessorSpec transientStore{"internal-dpl-transient-store",
188 {},
189 {},
191 DataProcessorSpec qaStore{"internal-dpl-qa-store",
192 {},
193 {},
195 DataProcessorSpec timer{"internal-dpl-clock",
196 {},
197 {},
199
200 // In case InputSpec of origin AOD are
201 // requested but not available as part of the workflow,
202 // we insert in the configuration something which
203 // reads them from file.
204 //
205 // FIXME: source branch is DataOrigin, for the moment. We should
206 // make it configurable via ConfigParamsOptions
207 auto aodLifetime = Lifetime::Enumeration;
208
209 DataProcessorSpec aodReader{
210 .name = "internal-dpl-aod-reader",
211 .inputs = {InputSpec{"enumeration",
212 "DPL",
213 "ENUM",
214 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
215 aodLifetime}},
216 .algorithm = AlgorithmSpec::dummyAlgorithm(),
217 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
218 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
219 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
220 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
221 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
222 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
223 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
224 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
225 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
226 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
227
228 // AOD reader can be rate limited
229 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
230 std::string rateLimitingChannelConfigInput;
231 std::string rateLimitingChannelConfigOutput;
232 bool internalRateLimiting = false;
233
234 // In case we have rate-limiting requested, any device without an input will get one on the special
235 // "DPL/RATE" message.
236 if (rateLimitingIPCID >= 0) {
237 rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
238 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
239 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
240 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
241 internalRateLimiting = true;
242 aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
243 }
244
245 ctx.services().registerService(ServiceRegistryHelpers::handleForService<AnalysisContext>(new AnalysisContext));
246 auto& ac = ctx.services().get<AnalysisContext>();
247
248 std::vector<InputSpec> requestedCCDBs;
249 std::vector<OutputSpec> providedCCDBs;
250
251 for (size_t wi = 0; wi < workflow.size(); ++wi) {
252 auto& processor = workflow[wi];
253 auto name = processor.name;
254 auto hash = runtime_hash(name.c_str());
255 ac.outTskMap.push_back({hash, name});
256
257 std::string prefix = "internal-dpl-";
258 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
259 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
260 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
261 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
262 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
263 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
264 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
265 }
266 bool hasTimeframeInputs = false;
267 for (auto& input : processor.inputs) {
268 if (input.lifetime == Lifetime::Timeframe) {
269 hasTimeframeInputs = true;
270 break;
271 }
272 }
273 bool hasTimeframeOutputs = false;
274 for (auto& output : processor.outputs) {
275 if (output.lifetime == Lifetime::Timeframe) {
276 hasTimeframeOutputs = true;
277 break;
278 }
279 }
280 // A timeframeSink consumes timeframes without creating new
281 // timeframe data.
282 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
283 if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
284 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
285 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
286 uint32_t hash = runtime_hash(processor.name.c_str());
287 bool hasMatch = false;
288 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
289 for (auto& output : processor.outputs) {
290 if (DataSpecUtils::match(output, summaryMatcher)) {
291 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
292 DataSpecUtils::describe(output).c_str(), processor.name.c_str());
293 hasMatch = true;
294 break;
295 }
296 }
297 if (!hasMatch) {
298 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
299 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
300 }
301 }
302 }
303 bool hasConditionOption = false;
304 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
305 auto& input = processor.inputs[ii];
306 switch (input.lifetime) {
307 case Lifetime::Timer: {
308 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
309 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
310 if (hasOption == false) {
311 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
312 }
313 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
314 } break;
315 case Lifetime::Signal: {
316 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
317 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
318 } break;
319 case Lifetime::Enumeration: {
320 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
321 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
322 } break;
323 case Lifetime::Condition: {
324 for (auto& option : processor.options) {
325 if (option.name == "condition-backend") {
326 hasConditionOption = true;
327 break;
328 }
329 }
330 if (hasConditionOption == false) {
331 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
332 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
333 hasConditionOption = true;
334 }
335 requestedCCDBs.emplace_back(input);
336 } break;
337 case Lifetime::OutOfBand: {
338 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
339 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
340 if (hasOption == false) {
341 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
342 }
343 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
344 } break;
345 case Lifetime::QA:
346 case Lifetime::Transient:
347 case Lifetime::Timeframe:
348 case Lifetime::Optional:
349 break;
350 }
351 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
352 DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input});
353 }
355 DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input});
356 }
358 DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input});
359 }
360 }
361
362 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
363
364 for (auto& output : processor.outputs) {
365 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
366 ac.providedAODs.emplace_back(output);
368 ac.providedDYNs.emplace_back(output);
370 ac.providedOutputObjHist.emplace_back(output);
371 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
372 if (it == ac.outObjHistMap.end()) {
373 ac.outObjHistMap.push_back({hash, {output.binding.value}});
374 } else {
375 it->bindings.push_back(output.binding.value);
376 }
377 }
378 if (output.lifetime == Lifetime::Condition) {
379 providedCCDBs.push_back(output);
380 }
381 }
382 }
383
384 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
385 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
386 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
387 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
388 for (auto& input : ac.requestedDYNs) {
389 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
390 ac.spawnerInputs.emplace_back(input);
391 }
392 }
393
394 DataProcessorSpec aodSpawner{
395 "internal-dpl-aod-spawner",
396 {},
397 {},
399 {}};
400
401 DataProcessorSpec indexBuilder{
402 "internal-dpl-aod-index-builder",
403 {},
404 {},
406 {}};
407
408 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder);
409 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner);
410
411 AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader);
412 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
413
414 std::vector<DataProcessorSpec> extraSpecs;
415
416 if (transientStore.outputs.empty() == false) {
417 extraSpecs.push_back(transientStore);
418 }
419 if (qaStore.outputs.empty() == false) {
420 extraSpecs.push_back(qaStore);
421 }
422
423 if (aodSpawner.outputs.empty() == false) {
424 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
425 }
426
427 if (indexBuilder.outputs.empty() == false) {
428 extraSpecs.push_back(indexBuilder);
429 }
430
431 // add the reader
432 if (aodReader.outputs.empty() == false) {
433 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
434 if (mctracks2aod == workflow.end()) {
435 // add normal reader
436 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
437 if (internalRateLimiting) {
438 aodReader.algorithm = CommonDataProcessors::wrapWithRateLimiting(algo);
439 } else {
440 aodReader.algorithm = algo;
441 }
442 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
443 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
444 } else {
445 // AODs are being injected on-the-fly, add dummy reader
446 aodReader.algorithm = AlgorithmSpec{
448 [outputs = aodReader.outputs](DeviceSpec const&) {
449 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
450 for (auto const& output : outputs) {
451 LOGP(warn, " {}", DataSpecUtils::describe(output));
452 }
453 LOGP(fatal, "Stopping.");
454 // to ensure the output type for adaptStateful
455 return adaptStateless([](DataAllocator&) {});
456 })};
457 }
458 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
459 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
460 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
461 }
462
463 ConcreteDataMatcher dstf{"FLP", "DISTSUBTIMEFRAME", 0xccdb};
464 if (ccdbBackend.outputs.empty() == false) {
465 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
466 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
467 bool providesDISTSTF = false;
468 // Check if any of the provided outputs is a DISTSTF
469 // Check if any of the requested inputs is for a 0xccdb message
470 for (auto& dp : workflow) {
471 for (auto& output : dp.outputs) {
472 if (DataSpecUtils::match(matcher, output)) {
473 providesDISTSTF = true;
475 break;
476 }
477 }
478 if (providesDISTSTF) {
479 break;
480 }
481 }
482 // * If there are AOD outputs we use TFNumber as the CCDB clock
483 // * If one device provides a DISTSTF we use that as the CCDB clock
484 // * If one of the devices provides a timer we use that as the CCDB clock
485 // * If none of the above apply add to the first data processor
486 // which has no inputs apart from enumerations the responsibility
487 // to provide the DISTSUBTIMEFRAME.
488 if (aodReader.outputs.empty() == false) {
489 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
490 } else if (providesDISTSTF) {
491 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
492 } else {
493 // We find the first device which has either just enumerations or
494 // just timers, and we add the DISTSUBTIMEFRAME to it.
495 // Notice how we do so in a stable manner by sorting the devices
496 // by name.
497 int enumCandidate = -1;
498 int timerCandidate = -1;
499 for (size_t wi = 0; wi < workflow.size(); wi++) {
500 auto& dp = workflow[wi];
501 if (dp.inputs.size() != 1) {
502 continue;
503 }
504 auto lifetime = dp.inputs[0].lifetime;
505 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
506 enumCandidate = wi;
507 }
508 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
509 timerCandidate = wi;
510 }
511 }
512 if (enumCandidate != -1) {
513 auto& dp = workflow[enumCandidate];
514 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
515 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
516 } else if (timerCandidate != -1) {
517 auto& dp = workflow[timerCandidate];
518 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
519 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
520 }
521 }
522
523 // Load the CCDB backend from the plugin
524 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
525 extraSpecs.push_back(ccdbBackend);
526 } else {
527 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
528 // we add to the first data processor which has no inputs (apart from
529 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
530 bool requiresDISTSUBTIMEFRAME = false;
531 for (auto& dp : workflow) {
532 for (auto& input : dp.inputs) {
533 if (DataSpecUtils::match(input, dstf)) {
534 requiresDISTSUBTIMEFRAME = true;
535 break;
536 }
537 }
538 }
539 if (requiresDISTSUBTIMEFRAME) {
540 // We find the first device which has either just enumerations or
541 // just timers, and we add the DISTSUBTIMEFRAME to it.
542 // Notice how we do so in a stable manner by sorting the devices
543 // by name.
544 int enumCandidate = -1;
545 int timerCandidate = -1;
546 for (size_t wi = 0; wi < workflow.size(); wi++) {
547 auto& dp = workflow[wi];
548 if (dp.inputs.size() != 1) {
549 continue;
550 }
551 auto lifetime = dp.inputs[0].lifetime;
552 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
553 enumCandidate = wi;
554 }
555 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
556 timerCandidate = wi;
557 }
558 }
559 if (enumCandidate != -1) {
560 auto& dp = workflow[enumCandidate];
561 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
562 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
563 } else if (timerCandidate != -1) {
564 auto& dp = workflow[timerCandidate];
565 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
566 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
567 }
568 }
569 }
570
571 // add the timer
572 if (timer.outputs.empty() == false) {
573 extraSpecs.push_back(timer);
574 }
575
576 // This is to inject a file sink so that any dangling ATSK object is written
577 // to a ROOT file.
578 if (ac.providedOutputObjHist.empty() == false) {
580 extraSpecs.push_back(rootSink);
581 }
582
583 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
584 extraSpecs.clear();
585
587 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
588 ac.isDangling = isDanglingTmp;
589 ac.outputsInputs = outputsInputsTmp;
590
591 // create DataOutputDescriptor
592 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
593
594 // select outputs of type AOD which need to be saved
595 // ATTENTION: if there are dangling outputs the getGlobalAODSink
596 // has to be created in any case!
597 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
598 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
599 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
600 if (ds.size() > 0 || ac.isDangling[ii]) {
601 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
602 }
603 }
604 }
605
606 // file sink for any AOD output
607 if (ac.outputsInputsAOD.size() > 0) {
608 // add TFNumber and TFFilename as input to the writer
609 ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
610 ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
611 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
612 extraSpecs.push_back(fileSink);
613
614 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool {
615 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
616 });
617 size_t ii = std::distance(ac.outputsInputs.begin(), it);
618 ac.isDangling[ii] = false;
619 }
620
621 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
622 extraSpecs.clear();
623
624 // Select dangling outputs which are not of type AOD
625 std::vector<InputSpec> redirectedOutputsInputs;
626 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
627 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
628 continue;
629 }
630 // We forward to the output proxy all the inputs only if they are dangling
631 // or if the forwarding policy is "proxy".
632 if (!ac.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
633 continue;
634 }
635 // AODs are skipped in any case.
636 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
637 continue;
638 }
639 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
640 }
641
642 std::vector<InputSpec> unmatched;
643 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
644 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
645 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
646 if (unmatched.size() != redirectedOutputsInputs.size()) {
647 extraSpecs.push_back(fileSink);
648 }
649 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
650 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
651 extraSpecs.push_back(fairMQSink);
652 } else if (forwardingDestination != "drop") {
653 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
654 }
655 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
656 std::vector<InputSpec> ignored = unmatched;
657 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
658 for (auto& ignoredInput : ignored) {
659 ignoredInput.lifetime = Lifetime::Sporadic;
660 }
661
662 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
663 }
664
665 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
666 extraSpecs.clear();
667}
668
670{
671 unsigned int distSTFCount = 0;
672 for (auto& spec : workflow) {
673 auto& inputs = spec.inputs;
674 bool allSporadic = true;
675 bool hasTimer = false;
676 bool hasSporadic = false;
677 bool hasOptionals = false;
678 for (auto& input : inputs) {
679 if (input.lifetime == Lifetime::Optional) {
680 hasOptionals = true;
681 }
682 }
683 for (auto& input : inputs) {
684 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
685 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
686 // have Optional inputs as well.
687 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
688 // forwarded before actual RAWDATA arrives.
689 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
690 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
691 LOGP(error,
692 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
693 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
694 DataSpecUtils::describe(input), input.binding);
695 }
696 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
697 // The first one remains unchanged, therefore we use the postincrement
698 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
699 continue;
700 }
701 // Timers are sporadic only when they are not
702 // alone.
703 if (input.lifetime == Lifetime::Timer) {
704 hasTimer = true;
705 continue;
706 }
707 if (input.lifetime == Lifetime::Sporadic) {
708 hasSporadic = true;
709 } else {
710 allSporadic = false;
711 }
712 }
713
714 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
715
716 // If they are not all sporadic (excluding timers)
717 // we leave things as they are.
718 if (allSporadic == false) {
719 continue;
720 }
721 // A timer alone is not sporadic.
722 if (hasSporadic == false) {
723 continue;
724 }
728 for (auto& output : spec.outputs) {
729 if (output.lifetime == Lifetime::Timeframe) {
730 output.lifetime = Lifetime::Sporadic;
731 }
732 }
733 }
734
735 if (distSTFCount > 0) {
736 bool found = false;
737 for (auto& spec : workflow) {
738 for (auto& output : spec.outputs) {
739 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
740 found = true;
741 break;
742 }
743 }
744 if (found) {
745 for (unsigned int i = 1; i < distSTFCount; ++i) {
746 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
747 }
748 break;
749 }
750 }
751 }
752}
753
755 std::vector<DeviceConnectionEdge>& logicalEdges,
756 std::vector<OutputSpec>& outputs,
757 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
758{
759 // In case the workflow is empty, we do not have anything to do.
760 if (workflow.empty()) {
761 return;
762 }
763
764 // This is the state. Oif is the iterator I use for the searches.
765 std::vector<LogicalOutputInfo> availableOutputsInfo;
766 auto const& constOutputs = outputs; // const version of the outputs
767 // Forwards is a local cache to avoid adding forwards before time.
768 std::vector<LogicalOutputInfo> forwards;
769
770 // Notice that availableOutputsInfo MUST be updated first, since it relies on
771 // the size of outputs to be the one before the update.
772 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
773 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
774 for (size_t wi = 0; wi < workflow.size(); ++wi) {
775 auto& producer = workflow[wi];
776 if (producer.outputs.empty()) {
777 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
778 }
779 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
780
781 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
782 auto& out = producer.outputs[oi];
783 auto uniqueOutputId = outputs.size();
784 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
785 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
786 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
787 outputs.push_back(out);
788 }
789 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
790 }
791 };
792
793 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
794 auto input = workflow[ci].inputs[ii];
795 std::ostringstream str;
796 str << "No matching output found for "
797 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
798
799 for (auto& output : constOutputs) {
800 str << "-" << DataSpecUtils::describe(output) << "\n";
801 }
802
803 throw std::runtime_error(str.str());
804 };
805
806 // This is the outer loop
807 //
808 // Here we iterate over dataprocessor items in workflow and we consider them
809 // as consumer, since we are interested in their inputs.
810 // Notice also we need to search for all the matching inputs, since
811 // we could have more than one source that matches (e.g. in the
812 // case of a time merger).
813 // Once consumed, an output is not actually used anymore, however
814 // we append it as a forward.
815 // Finally, If a device has n-way pipelining, we need to create one node per
816 // parallel pipeline and add an edge for each.
817 enumerateAvailableOutputs();
818
819 std::vector<bool> matches(constOutputs.size());
820 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
821 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
822 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
823 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
824 forwards.clear();
825 for (size_t i = 0; i < constOutputs.size(); i++) {
826 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
827 if (matches[i]) {
828 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
829 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
830 DataSpecUtils::describe(constOutputs[i]).c_str());
831 }
832 }
833
834 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
835 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
836 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
837 continue;
838 }
839 auto* oif = &availableOutputsInfo[i];
840 if (oif->forward) {
841 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
842 }
843 auto producer = oif->specIndex;
844 auto uniqueOutputId = oif->outputGlobalIndex;
845 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
846 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
847 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
848 workflow[producer].name.c_str());
849 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
850 }
851 }
852 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
853 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
854 oif->enabled = false;
855 }
856 if (forwards.empty()) {
857 errorDueToMissingOutputFor(consumer, input);
858 }
859 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
860 for (auto& forward : forwards) {
861 availableOutputsInfo.push_back(forward);
862 }
863 }
864 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
865 }
866}
867
868std::vector<EdgeAction>
870 const std::vector<DeviceConnectionEdge>& edges,
871 const std::vector<size_t>& index)
872{
873 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
874
875 assert(edges.size() == index.size());
876 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
877 for (size_t i : index) {
878 auto& edge = edges[i];
879 auto& action = actions[i];
880 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
881 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
882 last = edge;
883 }
884 return actions;
885}
886
887std::vector<EdgeAction>
889 const std::vector<DeviceConnectionEdge>& edges,
890 const std::vector<size_t>& index)
891{
892 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
893
894 assert(edges.size() == index.size());
895 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
896 for (size_t i : index) {
897 auto& edge = edges[i];
898 auto& action = actions[i];
899 // Calculate which actions need to be taken for this edge.
900 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
901 action.requiresNewChannel =
902 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
903
904 last = edge;
905 }
906 return actions;
907}
908
909void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
910 std::vector<size_t>& outEdgeIndex,
911 const std::vector<DeviceConnectionEdge>& edges)
912{
913 inEdgeIndex.resize(edges.size());
914 outEdgeIndex.resize(edges.size());
915 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
916 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
917
918 // Two indexes, one to bind the outputs, the other
919 // one to connect the inputs. The
920 auto outSorter = [&edges](size_t i, size_t j) {
921 auto& a = edges[i];
922 auto& b = edges[j];
923 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
924 };
925 auto inSorter = [&edges](size_t i, size_t j) {
926 auto& a = edges[i];
927 auto& b = edges[j];
928 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
929 };
930
931 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
932 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
933}
934
936{
937 if (workflow.empty()) {
939 }
940 std::set<std::string> validNames;
941 std::vector<OutputSpec> availableOutputs;
942 std::vector<InputSpec> requiredInputs;
943
944 // An index many to one index to go from a given input to the
945 // associated spec
946 std::map<size_t, size_t> inputToSpec;
947 // A one to one index to go from a given output to the Spec emitting it
948 std::map<size_t, size_t> outputToSpec;
949
950 std::ostringstream ss;
951
952 for (auto& spec : workflow) {
953 if (spec.name.empty()) {
954 throw std::runtime_error("Invalid DataProcessorSpec name");
955 }
956 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
957 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
958 }
959 if (validNames.find(spec.name) != validNames.end()) {
960 throw std::runtime_error("Name " + spec.name + " is used twice.");
961 }
962 validNames.insert(spec.name);
963 for (auto& option : spec.options) {
964 if (option.defaultValue.type() != VariantType::Empty &&
965 option.type != option.defaultValue.type()) {
966 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
967 << ") for " << option.name << " in DataProcessorSpec of "
968 << spec.name;
969 throw std::runtime_error(ss.str());
970 }
971 }
972 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
973 InputSpec const& input = spec.inputs[ii];
974 if (DataSpecUtils::validate(input) == false) {
975 ss << "In spec " << spec.name << " input specification "
976 << ii << " requires binding, description and origin"
977 " to be fully specified";
978 throw std::runtime_error(ss.str());
979 }
980 }
981 }
983}
984
985using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
988 size_t id;
989};
990
991std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
992{
993 // compute total number of input/output
994 size_t totalInputs = 0;
995 size_t totalOutputs = 0;
996 for (auto& spec : workflow) {
997 totalInputs += spec.inputs.size();
998 totalOutputs += spec.outputs.size();
999 }
1000
1001 std::vector<DataMatcherId> inputs;
1002 std::vector<DataMatcherId> outputs;
1003 inputs.reserve(totalInputs);
1004 outputs.reserve(totalOutputs);
1005
1006 std::vector<InputSpec> results;
1007 std::vector<bool> isDangling;
1008 results.reserve(totalOutputs);
1009 isDangling.reserve(totalOutputs);
1010
1012 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1013 auto& spec = workflow[wi];
1014 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1015 inputs.emplace_back(DataMatcherId{wi, ii});
1016 }
1017 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1018 outputs.emplace_back(DataMatcherId{wi, oi});
1019 }
1020 }
1021
1022 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1023 auto& output = outputs[oi];
1024 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1025
1026 // is dangling output?
1027 bool matched = false;
1028 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1029 auto& input = inputs[ii];
1030 // Inputs of the same workflow cannot match outputs
1031 if (output.workflowId == input.workflowId) {
1032 continue;
1033 }
1034 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1035 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1036 matched = true;
1037 break;
1038 }
1039 }
1040
1041 auto input = DataSpecUtils::matchingInput(outputSpec);
1042 char buf[64];
1043 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1044
1045 // make sure that entries are unique
1046 if (std::find(results.begin(), results.end(), input) == results.end()) {
1047 results.emplace_back(input);
1048 isDangling.emplace_back(matched == false);
1049 }
1050 }
1051
1052 // make sure that results is unique
1053 return std::make_tuple(results, isDangling);
1054}
1055
1056std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1057{
1058
1059 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1060
1061 std::vector<InputSpec> results;
1062 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1063 if (isDangling[ii]) {
1064 results.emplace_back(outputsInputs[ii]);
1065 }
1066 }
1067
1068 return results;
1069}
1070
1071bool validateLifetime(std::ostream& errors,
1072 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1073 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1074{
1075 // In case the completion policy is consume-any, we do not need to check anything.
1076 if (consumerPolicies.completionPolicyName == "consume-any") {
1077 return true;
1078 }
1079 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1080 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1081 DataSpecUtils::describe(input).c_str(), consumer.name,
1082 DataSpecUtils::describe(output).c_str(), producer.name);
1083 return false;
1084 }
1085 return true;
1086}
1087
1088bool validateExpendable(std::ostream& errors,
1089 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1090 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1091{
1092 auto isExpendable = [](DataProcessorLabel const& label) {
1093 return label.value == "expendable";
1094 };
1095 auto isResilient = [](DataProcessorLabel const& label) {
1096 return label.value == "expendable" || label.value == "resilient";
1097 };
1098 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1099 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1100 if (producerExpendable && consumerCritical) {
1101 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1102 consumer.name,
1103 producer.name);
1104 return false;
1105 }
1106 return true;
1107}
1108
1109using Validator = std::function<bool(std::ostream& errors,
1110 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1111 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1112
1114 std::vector<DataProcessorPoliciesInfo> const& policies,
1115 std::vector<DeviceConnectionEdge> const& edges,
1116 std::vector<OutputSpec> const& outputs)
1117{
1118 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1119 std::vector<Validator> defaultValidators = {validateExpendable};
1120 if (!disableLifetimeCheck) {
1121 defaultValidators.emplace_back(validateLifetime);
1122 }
1123 std::stringstream errors;
1124 // Iterate over all the edges.
1125 // Get the input lifetime and the output lifetime.
1126 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1127 bool hasErrors = false;
1128 for (auto& edge : edges) {
1129 DataProcessorSpec const& producer = workflow[edge.producer];
1130 DataProcessorSpec const& consumer = workflow[edge.consumer];
1131 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1132 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1133 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1134 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1135 for (auto& validator : defaultValidators) {
1136 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1137 }
1138 }
1139 if (hasErrors) {
1140 throw std::runtime_error(errors.str());
1141 }
1142}
1143
1144} // namespace o2::framework
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(char const *str)
std::ostringstream debug
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:191
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str