Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
30#include "Framework/Signpost.h"
32
33#include "Framework/Variant.h"
34#include "Headers/DataHeader.h"
35#include <algorithm>
36#include <list>
37#include <set>
38#include <utility>
39#include <vector>
40#include <climits>
41#include <numeric>
42
43O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
44
45namespace o2::framework
46{
47std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
48{
49 out << "(" << info.index << ", " << info.layer << ")";
50 return out;
51}
52
53std::vector<TopoIndexInfo>
55 int const* edgeIn,
56 int const* edgeOut,
57 size_t byteStride,
58 size_t edgesCount)
59{
60 size_t stride = byteStride / sizeof(int);
61 using EdgeIndex = int;
62 // Create the index which will be returned.
63 std::vector<TopoIndexInfo> index(nodeCount);
64 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
65 index[wi] = {wi, 0};
66 }
67 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
68 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
69 remainingEdgesIndex[ei] = ei;
70 }
71
72 // Create a vector where at each position we have true
73 // if the vector has dependencies, false otherwise
74 std::vector<bool> nodeDeps(nodeCount, false);
75 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
76 nodeDeps[*(edgeOut + ei * stride)] = true;
77 }
78
79 // We start with all those which do not have any dependencies
80 // They are layer 0.
81 std::list<TopoIndexInfo> L;
82 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
83 if (nodeDeps[ii] == false) {
84 L.push_back({ii, 0});
85 }
86 }
87
88 // The final result.
89 std::vector<TopoIndexInfo> S;
90 // The set of vertices which can be reached by the current node
91 std::set<TopoIndexInfo> nextVertex;
92 // The set of edges which are not related to the current node.
93 std::vector<EdgeIndex> nextEdges;
94 while (!L.empty()) {
95 auto node = L.front();
96 S.push_back(node);
97 L.pop_front();
98 nextVertex.clear();
99 nextEdges.clear();
100
101 // After this, nextVertex will contain all the vertices
102 // which have the current node as incoming.
103 // nextEdges will contain all the edges which are not related
104 // to the current node.
105 for (auto& ei : remainingEdgesIndex) {
106 if (*(edgeIn + ei * stride) == node.index) {
107 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
108 } else {
109 nextEdges.push_back(ei);
110 }
111 }
112 remainingEdgesIndex.swap(nextEdges);
113
114 // Of all the vertices which have node as incoming,
115 // check if there is any other incoming node.
116 std::set<TopoIndexInfo> hasPredecessors;
117 for (auto& ei : remainingEdgesIndex) {
118 for (auto& m : nextVertex) {
119 if (m.index == *(edgeOut + ei * stride)) {
120 hasPredecessors.insert({m.index, m.layer});
121 }
122 }
123 }
124 std::vector<TopoIndexInfo> withPredecessor;
125 std::set_difference(nextVertex.begin(), nextVertex.end(),
126 hasPredecessors.begin(), hasPredecessors.end(),
127 std::back_inserter(withPredecessor));
128 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
129 }
130 return S;
131}
132
133// get the default value for condition-backend
135{
136 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
137 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
138 if (explicitBackend) {
139 return getenv("DPL_CONDITION_BACKEND");
140 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
141 return "http://o2-ccdb.internal";
142 } else {
143 return "http://alice-ccdb.cern.ch";
144 }
145}
146
147// get the default value for condition query rate
149{
150 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
151}
152
153// get the default value for condition query rate multiplier
155{
156 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
157}
158
160{
161 auto fakeCallback = AlgorithmSpec{[](InitContext& ic) {
162 LOG(info) << "This is not a real device, merely a placeholder for external inputs";
163 LOG(info) << "To be hidden / removed at some point.";
164 // mark this dummy process as ready-to-quit
165 ic.services().get<ControlService>().readyToQuit(QuitRequest::Me);
166
167 return [](ProcessingContext& pc) {
168 // this callback is never called since there is no expiring input
169 pc.services().get<RawDeviceService>().waitFor(2000);
170 };
171 }};
172
173 DataProcessorSpec ccdbBackend{
174 .name = "internal-dpl-ccdb-backend",
175 .outputs = {},
176 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
177 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
178 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
179 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
180 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
181 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
182 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
183 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
184 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
185 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
186 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
187 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
188 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
189 };
190 DataProcessorSpec analysisCCDBBackend{
191 .name = "internal-dpl-aod-ccdb",
192 .inputs = {},
193 .outputs = {},
194 .algorithm = AlgorithmSpec::dummyAlgorithm(),
195 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
196 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
197 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
198 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
199 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
200 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
201 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
202 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
203 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
204 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
205 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
206 DataProcessorSpec transientStore{"internal-dpl-transient-store",
207 {},
208 {},
210 DataProcessorSpec qaStore{"internal-dpl-qa-store",
211 {},
212 {},
214 DataProcessorSpec timer{"internal-dpl-clock",
215 {},
216 {},
218
219 // In case InputSpec of origin AOD are
220 // requested but not available as part of the workflow,
221 // we insert in the configuration something which
222 // reads them from file.
223 //
224 // FIXME: source branch is DataOrigin, for the moment. We should
225 // make it configurable via ConfigParamsOptions
226 auto aodLifetime = Lifetime::Enumeration;
227
228 DataProcessorSpec aodReader{
229 .name = "internal-dpl-aod-reader",
230 .inputs = {InputSpec{"enumeration",
231 "DPL",
232 "ENUM",
233 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
234 aodLifetime}},
235 .algorithm = AlgorithmSpec::dummyAlgorithm(),
236 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
237 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
238 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
239 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
240 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
241 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
242 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
243 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
244 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
245 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
246
247 // AOD reader can be rate limited
248 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
249 std::string rateLimitingChannelConfigInput;
250 std::string rateLimitingChannelConfigOutput;
251 bool internalRateLimiting = false;
252
253 // In case we have rate-limiting requested, any device without an input will get one on the special
254 // "DPL/RATE" message.
255 if (rateLimitingIPCID >= 0) {
256 rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
257 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
258 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
259 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
260 internalRateLimiting = true;
261 aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
262 }
263
264 ctx.services().registerService(ServiceRegistryHelpers::handleForService<AnalysisContext>(new AnalysisContext));
265 auto& ac = ctx.services().get<AnalysisContext>();
266
267 std::vector<InputSpec> requestedCCDBs;
268 std::vector<OutputSpec> providedCCDBs;
269
270 for (size_t wi = 0; wi < workflow.size(); ++wi) {
271 auto& processor = workflow[wi];
272 auto name = processor.name;
273 auto hash = runtime_hash(name.c_str());
274 ac.outTskMap.push_back({hash, name});
275
276 std::string prefix = "internal-dpl-";
277 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
278 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
279 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
280 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
281 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
282 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
283 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
284 }
285 bool hasTimeframeInputs = false;
286 for (auto& input : processor.inputs) {
287 if (input.lifetime == Lifetime::Timeframe) {
288 hasTimeframeInputs = true;
289 break;
290 }
291 }
292 bool hasTimeframeOutputs = false;
293 for (auto& output : processor.outputs) {
294 if (output.lifetime == Lifetime::Timeframe) {
295 hasTimeframeOutputs = true;
296 break;
297 }
298 }
299 // A timeframeSink consumes timeframes without creating new
300 // timeframe data.
301 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
302 if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
303 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
304 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
305 uint32_t hash = runtime_hash(processor.name.c_str());
306 bool hasMatch = false;
307 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
308 for (auto& output : processor.outputs) {
309 if (DataSpecUtils::match(output, summaryMatcher)) {
310 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
311 DataSpecUtils::describe(output).c_str(), processor.name.c_str());
312 hasMatch = true;
313 break;
314 }
315 }
316 if (!hasMatch) {
317 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
318 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
319 }
320 }
321 }
322 bool hasConditionOption = false;
323 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
324 auto& input = processor.inputs[ii];
325 switch (input.lifetime) {
326 case Lifetime::Timer: {
327 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
328 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
329 if (hasOption == false) {
330 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
331 }
332 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
333 } break;
334 case Lifetime::Signal: {
335 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
336 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
337 } break;
338 case Lifetime::Enumeration: {
339 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
340 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
341 } break;
342 case Lifetime::Condition: {
343 for (auto& option : processor.options) {
344 if (option.name == "condition-backend") {
345 hasConditionOption = true;
346 break;
347 }
348 }
349 if (hasConditionOption == false) {
350 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
351 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
352 hasConditionOption = true;
353 }
354 requestedCCDBs.emplace_back(input);
355 } break;
356 case Lifetime::OutOfBand: {
357 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
358 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
359 if (hasOption == false) {
360 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
361 }
362 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
363 } break;
364 case Lifetime::QA:
365 case Lifetime::Transient:
366 case Lifetime::Timeframe:
367 case Lifetime::Optional:
368 break;
369 }
370 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
371 DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input});
372 }
374 DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input});
375 }
377 DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input});
378 }
380 DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input});
381 }
382 }
383
384 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
385
386 for (auto& output : processor.outputs) {
387 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
388 ac.providedAODs.emplace_back(output);
390 ac.providedDYNs.emplace_back(output);
392 ac.providedTIMs.emplace_back(output);
394 ac.providedOutputObjHist.emplace_back(output);
395 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
396 if (it == ac.outObjHistMap.end()) {
397 ac.outObjHistMap.push_back({hash, {output.binding.value}});
398 } else {
399 it->bindings.push_back(output.binding.value);
400 }
401 }
402 if (output.lifetime == Lifetime::Condition) {
403 providedCCDBs.push_back(output);
404 }
405 }
406 }
407
408 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
409 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
410 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
411 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
412 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
413 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
414
415 DataProcessorSpec indexBuilder{
416 "internal-dpl-aod-index-builder",
417 {},
418 {},
420 {}};
421 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder);
422
423 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
425 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
426 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend);
427 }
428
429 for (auto& input : ac.requestedDYNs) {
430 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
431 ac.spawnerInputs.emplace_back(input);
432 }
433 }
434
435 DataProcessorSpec aodSpawner{
436 "internal-dpl-aod-spawner",
437 {},
438 {},
440 {}};
441 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner);
442
443 AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader);
444 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
445
446 std::vector<DataProcessorSpec> extraSpecs;
447
448 if (transientStore.outputs.empty() == false) {
449 extraSpecs.push_back(transientStore);
450 }
451 if (qaStore.outputs.empty() == false) {
452 extraSpecs.push_back(qaStore);
453 }
454
455 if (aodSpawner.outputs.empty() == false) {
456 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
457 }
458
459 if (indexBuilder.outputs.empty() == false) {
460 extraSpecs.push_back(indexBuilder);
461 }
462
463 // add the reader
464 if (aodReader.outputs.empty() == false) {
465 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
466 if (mctracks2aod == workflow.end()) {
467 // add normal reader
468 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
469 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
470 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
471 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
472 } else {
473 // AODs are being injected on-the-fly, add dummy reader
474 auto algo = AlgorithmSpec{
476 [outputs = aodReader.outputs](DeviceSpec const&) {
477 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
478 for (auto const& output : outputs) {
479 LOGP(warn, " {}", DataSpecUtils::describe(output));
480 }
481 LOGP(fatal, "Stopping.");
482 // to ensure the output type for adaptStateful
483 return adaptStateless([](DataAllocator&) {});
484 })};
485 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
486 }
487 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
488 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
489 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
490 }
491
492 ConcreteDataMatcher dstf{"FLP", "DISTSUBTIMEFRAME", 0xccdb};
493 if (ccdbBackend.outputs.empty() == false) {
494 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
495 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
496 bool providesDISTSTF = false;
497 // Check if any of the provided outputs is a DISTSTF
498 // Check if any of the requested inputs is for a 0xccdb message
499 for (auto& dp : workflow) {
500 for (auto& output : dp.outputs) {
501 if (DataSpecUtils::match(matcher, output)) {
502 providesDISTSTF = true;
504 break;
505 }
506 }
507 if (providesDISTSTF) {
508 break;
509 }
510 }
511 // * If there are AOD outputs we use TFNumber as the CCDB clock
512 // * If one device provides a DISTSTF we use that as the CCDB clock
513 // * If one of the devices provides a timer we use that as the CCDB clock
514 // * If none of the above apply add to the first data processor
515 // which has no inputs apart from enumerations the responsibility
516 // to provide the DISTSUBTIMEFRAME.
517 if (aodReader.outputs.empty() == false) {
518 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
519 } else if (providesDISTSTF) {
520 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
521 } else {
522 // We find the first device which has either just enumerations or
523 // just timers, and we add the DISTSUBTIMEFRAME to it.
524 // Notice how we do so in a stable manner by sorting the devices
525 // by name.
526 int enumCandidate = -1;
527 int timerCandidate = -1;
528 for (size_t wi = 0; wi < workflow.size(); wi++) {
529 auto& dp = workflow[wi];
530 if (dp.inputs.size() != 1) {
531 continue;
532 }
533 auto lifetime = dp.inputs[0].lifetime;
534 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
535 enumCandidate = wi;
536 }
537 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
538 timerCandidate = wi;
539 }
540 }
541 if (enumCandidate != -1) {
542 auto& dp = workflow[enumCandidate];
543 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
544 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
545 } else if (timerCandidate != -1) {
546 auto& dp = workflow[timerCandidate];
547 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
548 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
549 }
550 }
551
552 // Load the CCDB backend from the plugin
553 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
554 extraSpecs.push_back(ccdbBackend);
555 } else {
556 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
557 // we add to the first data processor which has no inputs (apart from
558 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
559 bool requiresDISTSUBTIMEFRAME = false;
560 for (auto& dp : workflow) {
561 for (auto& input : dp.inputs) {
562 if (DataSpecUtils::match(input, dstf)) {
563 requiresDISTSUBTIMEFRAME = true;
564 break;
565 }
566 }
567 }
568 if (requiresDISTSUBTIMEFRAME) {
569 // We find the first device which has either just enumerations or
570 // just timers, and we add the DISTSUBTIMEFRAME to it.
571 // Notice how we do so in a stable manner by sorting the devices
572 // by name.
573 int enumCandidate = -1;
574 int timerCandidate = -1;
575 for (size_t wi = 0; wi < workflow.size(); wi++) {
576 auto& dp = workflow[wi];
577 if (dp.inputs.size() != 1) {
578 continue;
579 }
580 auto lifetime = dp.inputs[0].lifetime;
581 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
582 enumCandidate = wi;
583 }
584 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
585 timerCandidate = wi;
586 }
587 }
588 if (enumCandidate != -1) {
589 auto& dp = workflow[enumCandidate];
590 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
591 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
592 } else if (timerCandidate != -1) {
593 auto& dp = workflow[timerCandidate];
594 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
595 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
596 }
597 }
598 }
599
600 // add the Analysys CCDB backend which reads CCDB objects using a provided
601 // table
602 if (analysisCCDBBackend.outputs.empty() == false) {
603 // add normal reader
604 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
605 analysisCCDBBackend.algorithm = algo;
606 extraSpecs.push_back(analysisCCDBBackend);
607 }
608
609 // add the timer
610 if (timer.outputs.empty() == false) {
611 extraSpecs.push_back(timer);
612 }
613
614 // This is to inject a file sink so that any dangling ATSK object is written
615 // to a ROOT file.
616 if (ac.providedOutputObjHist.empty() == false) {
618 extraSpecs.push_back(rootSink);
619 }
620
621 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
622 extraSpecs.clear();
623
625 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
626 ac.isDangling = isDanglingTmp;
627 ac.outputsInputs = outputsInputsTmp;
628
629 // create DataOutputDescriptor
630 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
631
632 // select outputs of type AOD which need to be saved
633 // ATTENTION: if there are dangling outputs the getGlobalAODSink
634 // has to be created in any case!
635 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
636 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
637 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
638 if (ds.size() > 0 || ac.isDangling[ii]) {
639 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
640 }
641 }
642 }
643
644 // file sink for any AOD output
645 if (ac.outputsInputsAOD.size() > 0) {
646 // add TFNumber and TFFilename as input to the writer
647 ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
648 ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
649 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
650 extraSpecs.push_back(fileSink);
651
652 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool {
653 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
654 });
655 size_t ii = std::distance(ac.outputsInputs.begin(), it);
656 ac.isDangling[ii] = false;
657 }
658
659 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
660 extraSpecs.clear();
661
662 // Select dangling outputs which are not of type AOD
663 std::vector<InputSpec> redirectedOutputsInputs;
664 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
665 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
666 continue;
667 }
668 // We forward to the output proxy all the inputs only if they are dangling
669 // or if the forwarding policy is "proxy".
670 if (!ac.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
671 continue;
672 }
673 // AODs are skipped in any case.
674 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
675 continue;
676 }
677 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
678 }
679
680 std::vector<InputSpec> unmatched;
681 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
682 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
683 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
684 if (unmatched.size() != redirectedOutputsInputs.size()) {
685 extraSpecs.push_back(fileSink);
686 }
687 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
688 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
689 extraSpecs.push_back(fairMQSink);
690 } else if (forwardingDestination != "drop") {
691 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
692 }
693 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
694 std::vector<InputSpec> ignored = unmatched;
695 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
696 for (auto& ignoredInput : ignored) {
697 ignoredInput.lifetime = Lifetime::Sporadic;
698 }
699
700 // Use the new dummy sink when the AOD reader is there
701 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
702 if (aodReader.outputs.empty() == false) {
703 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
704 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
705 } else {
706 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
707 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
708 }
709 }
710
711 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
712 extraSpecs.clear();
713}
714
716{
717 unsigned int distSTFCount = 0;
718 for (auto& spec : workflow) {
719 auto& inputs = spec.inputs;
720 bool allSporadic = true;
721 bool hasTimer = false;
722 bool hasSporadic = false;
723 bool hasOptionals = false;
724 for (auto& input : inputs) {
725 if (input.lifetime == Lifetime::Optional) {
726 hasOptionals = true;
727 }
728 }
729 for (auto& input : inputs) {
730 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
731 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
732 // have Optional inputs as well.
733 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
734 // forwarded before actual RAWDATA arrives.
735 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
736 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
737 LOGP(error,
738 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
739 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
740 DataSpecUtils::describe(input), input.binding);
741 }
742 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
743 // The first one remains unchanged, therefore we use the postincrement
744 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
745 continue;
746 }
747 // Timers are sporadic only when they are not
748 // alone.
749 if (input.lifetime == Lifetime::Timer) {
750 hasTimer = true;
751 continue;
752 }
753 if (input.lifetime == Lifetime::Sporadic) {
754 hasSporadic = true;
755 } else {
756 allSporadic = false;
757 }
758 }
759
760 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
761
762 // If they are not all sporadic (excluding timers)
763 // we leave things as they are.
764 if (allSporadic == false) {
765 continue;
766 }
767 // A timer alone is not sporadic.
768 if (hasSporadic == false) {
769 continue;
770 }
774 for (auto& output : spec.outputs) {
775 if (output.lifetime == Lifetime::Timeframe) {
776 output.lifetime = Lifetime::Sporadic;
777 }
778 }
779 }
780
781 if (distSTFCount > 0) {
782 bool found = false;
783 for (auto& spec : workflow) {
784 for (auto& output : spec.outputs) {
785 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
786 found = true;
787 break;
788 }
789 }
790 if (found) {
791 for (unsigned int i = 1; i < distSTFCount; ++i) {
792 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
793 }
794 break;
795 }
796 }
797 }
798}
799
801 std::vector<DeviceConnectionEdge>& logicalEdges,
802 std::vector<OutputSpec>& outputs,
803 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
804{
805 // In case the workflow is empty, we do not have anything to do.
806 if (workflow.empty()) {
807 return;
808 }
809
810 // This is the state. Oif is the iterator I use for the searches.
811 std::vector<LogicalOutputInfo> availableOutputsInfo;
812 auto const& constOutputs = outputs; // const version of the outputs
813 // Forwards is a local cache to avoid adding forwards before time.
814 std::vector<LogicalOutputInfo> forwards;
815
816 // Notice that availableOutputsInfo MUST be updated first, since it relies on
817 // the size of outputs to be the one before the update.
818 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
819 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
820 for (size_t wi = 0; wi < workflow.size(); ++wi) {
821 auto& producer = workflow[wi];
822 if (producer.outputs.empty()) {
823 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
824 }
825 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
826
827 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
828 auto& out = producer.outputs[oi];
829 auto uniqueOutputId = outputs.size();
830 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
831 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
832 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
833 outputs.push_back(out);
834 }
835 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
836 }
837 };
838
839 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
840 auto input = workflow[ci].inputs[ii];
841 std::ostringstream str;
842 str << "No matching output found for "
843 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
844
845 for (auto& output : constOutputs) {
846 str << "-" << DataSpecUtils::describe(output) << "\n";
847 }
848
849 throw std::runtime_error(str.str());
850 };
851
852 // This is the outer loop
853 //
854 // Here we iterate over dataprocessor items in workflow and we consider them
855 // as consumer, since we are interested in their inputs.
856 // Notice also we need to search for all the matching inputs, since
857 // we could have more than one source that matches (e.g. in the
858 // case of a time merger).
859 // Once consumed, an output is not actually used anymore, however
860 // we append it as a forward.
861 // Finally, If a device has n-way pipelining, we need to create one node per
862 // parallel pipeline and add an edge for each.
863 enumerateAvailableOutputs();
864
865 std::vector<bool> matches(constOutputs.size());
866 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
867 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
868 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
869 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
870 forwards.clear();
871 for (size_t i = 0; i < constOutputs.size(); i++) {
872 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
873 if (matches[i]) {
874 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
875 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
876 DataSpecUtils::describe(constOutputs[i]).c_str());
877 }
878 }
879
880 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
881 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
882 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
883 continue;
884 }
885 auto* oif = &availableOutputsInfo[i];
886 if (oif->forward) {
887 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
888 }
889 auto producer = oif->specIndex;
890 auto uniqueOutputId = oif->outputGlobalIndex;
891 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
892 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
893 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
894 workflow[producer].name.c_str());
895 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
896 }
897 }
898 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
899 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
900 oif->enabled = false;
901 }
902 if (forwards.empty()) {
903 errorDueToMissingOutputFor(consumer, input);
904 }
905 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
906 for (auto& forward : forwards) {
907 availableOutputsInfo.push_back(forward);
908 }
909 }
910 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
911 }
912}
913
914std::vector<EdgeAction>
916 const std::vector<DeviceConnectionEdge>& edges,
917 const std::vector<size_t>& index)
918{
919 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
920
921 assert(edges.size() == index.size());
922 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
923 for (size_t i : index) {
924 auto& edge = edges[i];
925 auto& action = actions[i];
926 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
927 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
928 last = edge;
929 }
930 return actions;
931}
932
933std::vector<EdgeAction>
935 const std::vector<DeviceConnectionEdge>& edges,
936 const std::vector<size_t>& index)
937{
938 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
939
940 assert(edges.size() == index.size());
941 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
942 for (size_t i : index) {
943 auto& edge = edges[i];
944 auto& action = actions[i];
945 // Calculate which actions need to be taken for this edge.
946 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
947 action.requiresNewChannel =
948 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
949
950 last = edge;
951 }
952 return actions;
953}
954
955void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
956 std::vector<size_t>& outEdgeIndex,
957 const std::vector<DeviceConnectionEdge>& edges)
958{
959 inEdgeIndex.resize(edges.size());
960 outEdgeIndex.resize(edges.size());
961 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
962 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
963
964 // Two indexes, one to bind the outputs, the other
965 // one to connect the inputs. The
966 auto outSorter = [&edges](size_t i, size_t j) {
967 auto& a = edges[i];
968 auto& b = edges[j];
969 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
970 };
971 auto inSorter = [&edges](size_t i, size_t j) {
972 auto& a = edges[i];
973 auto& b = edges[j];
974 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
975 };
976
977 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
978 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
979}
980
982{
983 if (workflow.empty()) {
985 }
986 std::set<std::string> validNames;
987 std::vector<OutputSpec> availableOutputs;
988 std::vector<InputSpec> requiredInputs;
989
990 // An index many to one index to go from a given input to the
991 // associated spec
992 std::map<size_t, size_t> inputToSpec;
993 // A one to one index to go from a given output to the Spec emitting it
994 std::map<size_t, size_t> outputToSpec;
995
996 std::ostringstream ss;
997
998 for (auto& spec : workflow) {
999 if (spec.name.empty()) {
1000 throw std::runtime_error("Invalid DataProcessorSpec name");
1001 }
1002 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
1003 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
1004 }
1005 if (validNames.find(spec.name) != validNames.end()) {
1006 throw std::runtime_error("Name " + spec.name + " is used twice.");
1007 }
1008 validNames.insert(spec.name);
1009 for (auto& option : spec.options) {
1010 if (option.defaultValue.type() != VariantType::Empty &&
1011 option.type != option.defaultValue.type()) {
1012 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
1013 << ") for " << option.name << " in DataProcessorSpec of "
1014 << spec.name;
1015 throw std::runtime_error(ss.str());
1016 }
1017 }
1018 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
1019 InputSpec const& input = spec.inputs[ii];
1020 if (DataSpecUtils::validate(input) == false) {
1021 ss << "In spec " << spec.name << " input specification "
1022 << ii << " requires binding, description and origin"
1023 " to be fully specified (found "
1024 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
1025 throw std::runtime_error(ss.str());
1026 }
1027 }
1028 }
1030}
1031
1032using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
1035 size_t id;
1036};
1037
1038std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
1039{
1040 // compute total number of input/output
1041 size_t totalInputs = 0;
1042 size_t totalOutputs = 0;
1043 for (auto& spec : workflow) {
1044 totalInputs += spec.inputs.size();
1045 totalOutputs += spec.outputs.size();
1046 }
1047
1048 std::vector<DataMatcherId> inputs;
1049 std::vector<DataMatcherId> outputs;
1050 inputs.reserve(totalInputs);
1051 outputs.reserve(totalOutputs);
1052
1053 std::vector<InputSpec> results;
1054 std::vector<bool> isDangling;
1055 results.reserve(totalOutputs);
1056 isDangling.reserve(totalOutputs);
1057
1059 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1060 auto& spec = workflow[wi];
1061 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1062 inputs.emplace_back(DataMatcherId{wi, ii});
1063 }
1064 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1065 outputs.emplace_back(DataMatcherId{wi, oi});
1066 }
1067 }
1068
1069 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1070 auto& output = outputs[oi];
1071 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1072
1073 // is dangling output?
1074 bool matched = false;
1075 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1076 auto& input = inputs[ii];
1077 // Inputs of the same workflow cannot match outputs
1078 if (output.workflowId == input.workflowId) {
1079 continue;
1080 }
1081 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1082 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1083 matched = true;
1084 break;
1085 }
1086 }
1087
1088 auto input = DataSpecUtils::matchingInput(outputSpec);
1089 char buf[64];
1090 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1091
1092 // make sure that entries are unique
1093 if (std::find(results.begin(), results.end(), input) == results.end()) {
1094 results.emplace_back(input);
1095 isDangling.emplace_back(matched == false);
1096 }
1097 }
1098
1099 // make sure that results is unique
1100 return std::make_tuple(results, isDangling);
1101}
1102
1103std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1104{
1105
1106 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1107
1108 std::vector<InputSpec> results;
1109 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1110 if (isDangling[ii]) {
1111 results.emplace_back(outputsInputs[ii]);
1112 }
1113 }
1114
1115 return results;
1116}
1117
1118bool validateLifetime(std::ostream& errors,
1119 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1120 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1121{
1122 // In case the completion policy is consume-any, we do not need to check anything.
1123 if (consumerPolicies.completionPolicyName == "consume-any") {
1124 return true;
1125 }
1126 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1127 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1128 DataSpecUtils::describe(input).c_str(), consumer.name,
1129 DataSpecUtils::describe(output).c_str(), producer.name);
1130 return false;
1131 }
1132 return true;
1133}
1134
1135bool validateExpendable(std::ostream& errors,
1136 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1137 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1138{
1139 auto isExpendable = [](DataProcessorLabel const& label) {
1140 return label.value == "expendable";
1141 };
1142 auto isResilient = [](DataProcessorLabel const& label) {
1143 return label.value == "expendable" || label.value == "resilient";
1144 };
1145 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1146 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1147 if (producerExpendable && consumerCritical) {
1148 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1149 consumer.name,
1150 producer.name);
1151 return false;
1152 }
1153 return true;
1154}
1155
1156using Validator = std::function<bool(std::ostream& errors,
1157 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1158 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1159
1161 std::vector<DataProcessorPoliciesInfo> const& policies,
1162 std::vector<DeviceConnectionEdge> const& edges,
1163 std::vector<OutputSpec> const& outputs)
1164{
1165 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1166 std::vector<Validator> defaultValidators = {validateExpendable};
1167 if (!disableLifetimeCheck) {
1168 defaultValidators.emplace_back(validateLifetime);
1169 }
1170 std::stringstream errors;
1171 // Iterate over all the edges.
1172 // Get the input lifetime and the output lifetime.
1173 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1174 bool hasErrors = false;
1175 for (auto& edge : edges) {
1176 DataProcessorSpec const& producer = workflow[edge.producer];
1177 DataProcessorSpec const& consumer = workflow[edge.consumer];
1178 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1179 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1180 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1181 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1182 for (auto& validator : defaultValidators) {
1183 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1184 }
1185 }
1186 if (hasErrors) {
1187 throw std::runtime_error(errors.str());
1188 }
1189}
1190
1191} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static AlgorithmSpec aodSpawnerCallback(ConfigContext const &ctx)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str