Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
28#include "Framework/Signpost.h"
30
31#include "Framework/Variant.h"
32#include "Headers/DataHeader.h"
33#include <algorithm>
34#include <list>
35#include <set>
36#include <utility>
37#include <vector>
38#include <climits>
39#include <numeric>
40
41O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
42
43namespace o2::framework
44{
45std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
46{
47 out << "(" << info.index << ", " << info.layer << ")";
48 return out;
49}
50
51std::vector<TopoIndexInfo>
53 int const* edgeIn,
54 int const* edgeOut,
55 size_t byteStride,
56 size_t edgesCount)
57{
58 size_t stride = byteStride / sizeof(int);
59 using EdgeIndex = int;
60 // Create the index which will be returned.
61 std::vector<TopoIndexInfo> index(nodeCount);
62 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
63 index[wi] = {wi, 0};
64 }
65 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
66 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
67 remainingEdgesIndex[ei] = ei;
68 }
69
70 // Create a vector where at each position we have true
71 // if the vector has dependencies, false otherwise
72 std::vector<bool> nodeDeps(nodeCount, false);
73 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
74 nodeDeps[*(edgeOut + ei * stride)] = true;
75 }
76
77 // We start with all those which do not have any dependencies
78 // They are layer 0.
79 std::list<TopoIndexInfo> L;
80 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
81 if (nodeDeps[ii] == false) {
82 L.push_back({ii, 0});
83 }
84 }
85
86 // The final result.
87 std::vector<TopoIndexInfo> S;
88 // The set of vertices which can be reached by the current node
89 std::set<TopoIndexInfo> nextVertex;
90 // The set of edges which are not related to the current node.
91 std::vector<EdgeIndex> nextEdges;
92 while (!L.empty()) {
93 auto node = L.front();
94 S.push_back(node);
95 L.pop_front();
96 nextVertex.clear();
97 nextEdges.clear();
98
99 // After this, nextVertex will contain all the vertices
100 // which have the current node as incoming.
101 // nextEdges will contain all the edges which are not related
102 // to the current node.
103 for (auto& ei : remainingEdgesIndex) {
104 if (*(edgeIn + ei * stride) == node.index) {
105 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
106 } else {
107 nextEdges.push_back(ei);
108 }
109 }
110 remainingEdgesIndex.swap(nextEdges);
111
112 // Of all the vertices which have node as incoming,
113 // check if there is any other incoming node.
114 std::set<TopoIndexInfo> hasPredecessors;
115 for (auto& ei : remainingEdgesIndex) {
116 for (auto& m : nextVertex) {
117 if (m.index == *(edgeOut + ei * stride)) {
118 hasPredecessors.insert({m.index, m.layer});
119 }
120 }
121 }
122 std::vector<TopoIndexInfo> withPredecessor;
123 std::set_difference(nextVertex.begin(), nextVertex.end(),
124 hasPredecessors.begin(), hasPredecessors.end(),
125 std::back_inserter(withPredecessor));
126 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
127 }
128 return S;
129}
130
131// get the default value for condition-backend
133{
134 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
135 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
136 if (explicitBackend) {
137 return getenv("DPL_CONDITION_BACKEND");
138 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
139 return "http://o2-ccdb.internal";
140 } else {
141 return "http://alice-ccdb.cern.ch";
142 }
143}
144
145// get the default value for condition query rate
147{
148 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
149}
150
151// get the default value for condition query rate multiplier
153{
154 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
155}
156
158{
159 DataProcessorSpec ccdbBackend{
160 .name = "internal-dpl-ccdb-backend",
161 .outputs = {},
162 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
163 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
164 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
165 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
166 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
167 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
168 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
169 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
170 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
171 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
172 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
173 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
174 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
175 };
176 DataProcessorSpec analysisCCDBBackend{
177 .name = "internal-dpl-aod-ccdb",
178 .inputs = {},
179 .outputs = {},
180 .algorithm = AlgorithmSpec::dummyAlgorithm(),
181 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
182 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
183 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
184 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
185 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
186 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
187 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
188 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
189 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
190 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
191 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
192 DataProcessorSpec transientStore{"internal-dpl-transient-store",
193 {},
194 {},
196 DataProcessorSpec qaStore{"internal-dpl-qa-store",
197 {},
198 {},
200 DataProcessorSpec timer{"internal-dpl-clock",
201 {},
202 {},
204
205 // In case InputSpec of origin AOD are
206 // requested but not available as part of the workflow,
207 // we insert in the configuration something which
208 // reads them from file.
209 //
210 // FIXME: source branch is DataOrigin, for the moment. We should
211 // make it configurable via ConfigParamsOptions
212 auto aodLifetime = Lifetime::Enumeration;
213
214 DataProcessorSpec aodReader{
215 .name = "internal-dpl-aod-reader",
216 .inputs = {InputSpec{"enumeration",
217 "DPL",
218 "ENUM",
219 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
220 aodLifetime}},
221 .algorithm = AlgorithmSpec::dummyAlgorithm(),
222 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
223 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
224 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
225 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
226 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
227 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
228 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
229 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
230 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
231 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
232
233 // AOD reader can be rate limited
234 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
235 std::string rateLimitingChannelConfigInput;
236 std::string rateLimitingChannelConfigOutput;
237 bool internalRateLimiting = false;
238
239 // In case we have rate-limiting requested, any device without an input will get one on the special
240 // "DPL/RATE" message.
241 if (rateLimitingIPCID >= 0) {
242 rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
243 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
244 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
245 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
246 internalRateLimiting = true;
247 aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
248 }
249
250 ctx.services().registerService(ServiceRegistryHelpers::handleForService<AnalysisContext>(new AnalysisContext));
251 auto& ac = ctx.services().get<AnalysisContext>();
252
253 std::vector<InputSpec> requestedCCDBs;
254 std::vector<OutputSpec> providedCCDBs;
255
256 for (size_t wi = 0; wi < workflow.size(); ++wi) {
257 auto& processor = workflow[wi];
258 auto name = processor.name;
259 auto hash = runtime_hash(name.c_str());
260 ac.outTskMap.push_back({hash, name});
261
262 std::string prefix = "internal-dpl-";
263 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
264 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
265 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
266 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
267 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
268 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
269 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
270 }
271 bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input) { return input.lifetime == Lifetime::Timeframe; });
272 bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output) { return output.lifetime == Lifetime::Timeframe; });
273
274 // A timeframeSink consumes timeframes without creating new
275 // timeframe data.
276 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
277 if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
278 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
279 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
280 uint32_t hash = runtime_hash(processor.name.c_str());
281 bool hasMatch = false;
282 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
283 auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); });
284 if (summaryOutput != processor.outputs.end()) {
285 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
286 DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str());
287 hasMatch = true;
288 }
289
290 if (!hasMatch) {
291 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
292 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
293 }
294 }
295 }
296 bool hasConditionOption = false;
297 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
298 auto& input = processor.inputs[ii];
299 switch (input.lifetime) {
300 case Lifetime::Timer: {
301 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
302 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
303 if (hasOption == false) {
304 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
305 }
306 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
307 } break;
308 case Lifetime::Signal: {
309 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
310 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
311 } break;
312 case Lifetime::Enumeration: {
313 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
314 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
315 } break;
316 case Lifetime::Condition: {
317 requestedCCDBs.emplace_back(input);
318 if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) {
319 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
320 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
321 hasConditionOption = true;
322 }
323 } break;
324 case Lifetime::OutOfBand: {
325 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
326 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
327 if (hasOption == false) {
328 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
329 }
330 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
331 } break;
332 case Lifetime::QA:
333 case Lifetime::Transient:
334 case Lifetime::Timeframe:
335 case Lifetime::Optional:
336 break;
337 }
338 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
339 DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input});
340 }
342 DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input});
343 }
345 DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input});
346 }
348 DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input});
349 }
350 }
351
352 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
353
354 for (auto& output : processor.outputs) {
355 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
356 ac.providedAODs.emplace_back(output);
358 ac.providedDYNs.emplace_back(output);
360 ac.providedTIMs.emplace_back(output);
362 ac.providedOutputObjHist.emplace_back(output);
363 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
364 if (it == ac.outObjHistMap.end()) {
365 ac.outObjHistMap.push_back({hash, {output.binding.value}});
366 } else {
367 it->bindings.push_back(output.binding.value);
368 }
369 }
370 if (output.lifetime == Lifetime::Condition) {
371 providedCCDBs.push_back(output);
372 }
373 }
374 }
375
376 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
377 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
378 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
379 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
380 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
381 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
382
383 DataProcessorSpec indexBuilder{
384 "internal-dpl-aod-index-builder",
385 {},
386 {},
387 PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx), // readers::AODReaderHelpers::indexBuilderCallback(ctx),
388 {}};
389 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder);
390
391 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
393 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
394 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend);
395 }
396
397 ac.requestedDYNs | views::filter_not_matching(ac.providedDYNs) | sinks::append_to{ac.spawnerInputs};
398
399 DataProcessorSpec aodSpawner{
400 "internal-dpl-aod-spawner",
401 {},
402 {},
403 PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx), // readers::AODReaderHelpers::aodSpawnerCallback(ctx),
404 {}};
405 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner);
406
407 AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader);
408
409 std::sort(requestedCCDBs.begin(), requestedCCDBs.end(), inputSpecLessThan);
410 std::sort(providedCCDBs.begin(), providedCCDBs.end(), outputSpecLessThan);
411 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
412
413 std::vector<DataProcessorSpec> extraSpecs;
414
415 if (transientStore.outputs.empty() == false) {
416 extraSpecs.push_back(transientStore);
417 }
418 if (qaStore.outputs.empty() == false) {
419 extraSpecs.push_back(qaStore);
420 }
421
422 if (aodSpawner.outputs.empty() == false) {
423 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
424 }
425
426 if (indexBuilder.outputs.empty() == false) {
427 extraSpecs.push_back(indexBuilder);
428 }
429
430 // add the reader
431 if (aodReader.outputs.empty() == false) {
432 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
433 if (mctracks2aod == workflow.end()) {
434 // add normal reader
435 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
436 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
437 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
438 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
439 } else {
440 // AODs are being injected on-the-fly, add dummy reader
441 auto algo = AlgorithmSpec{
443 [outputs = aodReader.outputs](DeviceSpec const&) {
444 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
445 for (auto const& output : outputs) {
446 LOGP(warn, " {}", DataSpecUtils::describe(output));
447 }
448 LOGP(fatal, "Stopping.");
449 // to ensure the output type for adaptStateful
450 return adaptStateless([](DataAllocator&) {});
451 })};
452 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
453 }
454 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
455 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
456 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
457 }
458
459 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
460 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
461 // Check if any of the provided outputs is a DISTSTF
462 // Check if any of the requested inputs is for a 0xccdb message
463 bool providesDISTSTF = std::any_of(workflow.begin(), workflow.end(),
464 [&matcher](auto const& dp) {
465 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
466 return DataSpecUtils::match(matcher, output);
467 });
468 });
469
470 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
471 // we add to the first data processor which has no inputs (apart from
472 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
473 bool requiresDISTSUBTIMEFRAME = std::any_of(workflow.begin(), workflow.end(),
474 [&dstf](auto const& dp) {
475 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
476 return DataSpecUtils::match(input, dstf);
477 });
478 });
479
480 // We find the first device which has either just enumerations or
481 // just timers, and we will add the DISTSUBTIMEFRAME to it.
482 // Notice how we do so in a stable manner by sorting the devices
483 // by name.
484 int enumCandidate = -1;
485 int timerCandidate = -1;
486 for (auto wi = 0U; wi < workflow.size(); ++wi) {
487 auto& dp = workflow[wi];
488 if (dp.inputs.size() != 1) {
489 continue;
490 }
491 auto lifetime = dp.inputs[0].lifetime;
492 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
493 enumCandidate = wi;
494 }
495 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
496 timerCandidate = wi;
497 }
498 }
499
500 // * If there are AOD outputs we use TFNumber as the CCDB clock
501 // * If one device provides a DISTSTF we use that as the CCDB clock
502 // * If one of the devices provides a timer we use that as the CCDB clock
503 // * If none of the above apply, add to the first data processor
504 // which has no inputs apart from enumerations the responsibility
505 // to provide the DISTSUBTIMEFRAME.
506 if (ccdbBackend.outputs.empty() == false) {
507 if (aodReader.outputs.empty() == false) {
508 // fetcher clock follows AOD source (TFNumber)
509 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
510 } else if (providesDISTSTF) {
511 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
512 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
513 } else {
514 if (enumCandidate != -1) {
515 // add DSTF/ccdb source to the enumeration-driven source explicitly
516 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
517 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
518 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
519 } else if (timerCandidate != -1) {
520 // fetcher clock is proived by timer source
521 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
522 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
523 }
524 }
525
526 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
527 // Load the CCDB backend from the plugin
528 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
529 extraSpecs.push_back(ccdbBackend);
530 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
531 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
532 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
533 }
534
535 // add the Analysys CCDB backend which reads CCDB objects using a provided table
536 if (analysisCCDBBackend.outputs.empty() == false) {
537 // add normal reader
538 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
539 analysisCCDBBackend.algorithm = algo;
540 extraSpecs.push_back(analysisCCDBBackend);
541 }
542
543 // add the timer
544 if (timer.outputs.empty() == false) {
545 extraSpecs.push_back(timer);
546 }
547
548 // This is to inject a file sink so that any dangling ATSK object is written
549 // to a ROOT file.
550 if (ac.providedOutputObjHist.empty() == false) {
552 extraSpecs.push_back(rootSink);
553 }
554
555 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
556 extraSpecs.clear();
557
559 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
560 ac.isDangling = isDanglingTmp;
561 ac.outputsInputs = outputsInputsTmp;
562
563 // create DataOutputDescriptor
564 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
565
566 // select outputs of type AOD which need to be saved
567 // ATTENTION: if there are dangling outputs the getGlobalAODSink
568 // has to be created in any case!
569 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
570 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
571 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
572 if (ds.size() > 0 || ac.isDangling[ii]) {
573 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
574 }
575 }
576 }
577
578 // file sink for any AOD output
579 if (ac.outputsInputsAOD.size() > 0) {
580 // add TFNumber and TFFilename as input to the writer
581 ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
582 ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
583 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
584 extraSpecs.push_back(fileSink);
585
586 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool {
587 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
588 });
589 size_t ii = std::distance(ac.outputsInputs.begin(), it);
590 ac.isDangling[ii] = false;
591 }
592
593 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
594 extraSpecs.clear();
595
596 // Select dangling outputs which are not of type AOD
597 std::vector<InputSpec> redirectedOutputsInputs;
598 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
599 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
600 continue;
601 }
602 // We forward to the output proxy all the inputs only if they are dangling
603 // or if the forwarding policy is "proxy".
604 if (!ac.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
605 continue;
606 }
607 // AODs are skipped in any case.
608 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
609 continue;
610 }
611 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
612 }
613
614 std::vector<InputSpec> unmatched;
615 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
616 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
617 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
618 if (unmatched.size() != redirectedOutputsInputs.size()) {
619 extraSpecs.push_back(fileSink);
620 }
621 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
622 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
623 extraSpecs.push_back(fairMQSink);
624 } else if (forwardingDestination != "drop") {
625 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
626 }
627 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
628 std::vector<InputSpec> ignored = unmatched;
629 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
630 for (auto& ignoredInput : ignored) {
631 ignoredInput.lifetime = Lifetime::Sporadic;
632 }
633
634 // Use the new dummy sink when the AOD reader is there
635 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
636 if (aodReader.outputs.empty() == false) {
637 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
638 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
639 } else {
640 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
641 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
642 }
643 }
644
645 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
646 extraSpecs.clear();
647}
648
650{
651 unsigned int distSTFCount = 0;
652 for (auto& spec : workflow) {
653 auto& inputs = spec.inputs;
654 bool allSporadic = true;
655 bool hasTimer = false;
656 bool hasSporadic = false;
657 bool hasOptionals = false;
658 for (auto& input : inputs) {
659 if (input.lifetime == Lifetime::Optional) {
660 hasOptionals = true;
661 }
662 }
663 for (auto& input : inputs) {
664 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
665 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
666 // have Optional inputs as well.
667 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
668 // forwarded before actual RAWDATA arrives.
669 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
670 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
671 LOGP(error,
672 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
673 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
674 DataSpecUtils::describe(input), input.binding);
675 }
676 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
677 // The first one remains unchanged, therefore we use the postincrement
678 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
679 continue;
680 }
681 // Timers are sporadic only when they are not
682 // alone.
683 if (input.lifetime == Lifetime::Timer) {
684 hasTimer = true;
685 continue;
686 }
687 if (input.lifetime == Lifetime::Sporadic) {
688 hasSporadic = true;
689 } else {
690 allSporadic = false;
691 }
692 }
693
694 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
695
696 // If they are not all sporadic (excluding timers)
697 // we leave things as they are.
698 if (allSporadic == false) {
699 continue;
700 }
701 // A timer alone is not sporadic.
702 if (hasSporadic == false) {
703 continue;
704 }
708 for (auto& output : spec.outputs) {
709 if (output.lifetime == Lifetime::Timeframe) {
710 output.lifetime = Lifetime::Sporadic;
711 }
712 }
713 }
714
715 if (distSTFCount > 0) {
716 bool found = false;
717 for (auto& spec : workflow) {
718 for (auto& output : spec.outputs) {
719 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
720 found = true;
721 break;
722 }
723 }
724 if (found) {
725 for (unsigned int i = 1; i < distSTFCount; ++i) {
726 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
727 }
728 break;
729 }
730 }
731 }
732}
733
735 std::vector<DeviceConnectionEdge>& logicalEdges,
736 std::vector<OutputSpec>& outputs,
737 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
738{
739 // In case the workflow is empty, we do not have anything to do.
740 if (workflow.empty()) {
741 return;
742 }
743
744 // This is the state. Oif is the iterator I use for the searches.
745 std::vector<LogicalOutputInfo> availableOutputsInfo;
746 auto const& constOutputs = outputs; // const version of the outputs
747 // Forwards is a local cache to avoid adding forwards before time.
748 std::vector<LogicalOutputInfo> forwards;
749
750 // Notice that availableOutputsInfo MUST be updated first, since it relies on
751 // the size of outputs to be the one before the update.
752 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
753 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
754 for (size_t wi = 0; wi < workflow.size(); ++wi) {
755 auto& producer = workflow[wi];
756 if (producer.outputs.empty()) {
757 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
758 }
759 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
760
761 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
762 auto& out = producer.outputs[oi];
763 auto uniqueOutputId = outputs.size();
764 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
765 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
766 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
767 outputs.push_back(out);
768 }
769 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
770 }
771 };
772
773 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
774 auto input = workflow[ci].inputs[ii];
775 std::ostringstream str;
776 str << "No matching output found for "
777 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
778
779 for (auto& output : constOutputs) {
780 str << "-" << DataSpecUtils::describe(output) << "\n";
781 }
782
783 throw std::runtime_error(str.str());
784 };
785
786 // This is the outer loop
787 //
788 // Here we iterate over dataprocessor items in workflow and we consider them
789 // as consumer, since we are interested in their inputs.
790 // Notice also we need to search for all the matching inputs, since
791 // we could have more than one source that matches (e.g. in the
792 // case of a time merger).
793 // Once consumed, an output is not actually used anymore, however
794 // we append it as a forward.
795 // Finally, If a device has n-way pipelining, we need to create one node per
796 // parallel pipeline and add an edge for each.
797 enumerateAvailableOutputs();
798
799 std::vector<bool> matches(constOutputs.size());
800 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
801 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
802 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
803 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
804 forwards.clear();
805 for (size_t i = 0; i < constOutputs.size(); i++) {
806 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
807 if (matches[i]) {
808 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
809 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
810 DataSpecUtils::describe(constOutputs[i]).c_str());
811 }
812 }
813
814 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
815 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
816 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
817 continue;
818 }
819 auto* oif = &availableOutputsInfo[i];
820 if (oif->forward) {
821 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
822 }
823 auto producer = oif->specIndex;
824 auto uniqueOutputId = oif->outputGlobalIndex;
825 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
826 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
827 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
828 workflow[producer].name.c_str());
829 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
830 }
831 }
832 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
833 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
834 oif->enabled = false;
835 }
836 if (forwards.empty()) {
837 errorDueToMissingOutputFor(consumer, input);
838 }
839 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
840 for (auto& forward : forwards) {
841 availableOutputsInfo.push_back(forward);
842 }
843 }
844 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
845 }
846}
847
848std::vector<EdgeAction>
850 const std::vector<DeviceConnectionEdge>& edges,
851 const std::vector<size_t>& index)
852{
853 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
854
855 assert(edges.size() == index.size());
856 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
857 for (size_t i : index) {
858 auto& edge = edges[i];
859 auto& action = actions[i];
860 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
861 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
862 last = edge;
863 }
864 return actions;
865}
866
867std::vector<EdgeAction>
869 const std::vector<DeviceConnectionEdge>& edges,
870 const std::vector<size_t>& index)
871{
872 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
873
874 assert(edges.size() == index.size());
875 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
876 for (size_t i : index) {
877 auto& edge = edges[i];
878 auto& action = actions[i];
879 // Calculate which actions need to be taken for this edge.
880 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
881 action.requiresNewChannel =
882 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
883
884 last = edge;
885 }
886 return actions;
887}
888
889void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
890 std::vector<size_t>& outEdgeIndex,
891 const std::vector<DeviceConnectionEdge>& edges)
892{
893 inEdgeIndex.resize(edges.size());
894 outEdgeIndex.resize(edges.size());
895 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
896 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
897
898 // Two indexes, one to bind the outputs, the other
899 // one to connect the inputs. The
900 auto outSorter = [&edges](size_t i, size_t j) {
901 auto& a = edges[i];
902 auto& b = edges[j];
903 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
904 };
905 auto inSorter = [&edges](size_t i, size_t j) {
906 auto& a = edges[i];
907 auto& b = edges[j];
908 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
909 };
910
911 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
912 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
913}
914
916{
917 if (workflow.empty()) {
919 }
920 std::set<std::string> validNames;
921 std::vector<OutputSpec> availableOutputs;
922 std::vector<InputSpec> requiredInputs;
923
924 // An index many to one index to go from a given input to the
925 // associated spec
926 std::map<size_t, size_t> inputToSpec;
927 // A one to one index to go from a given output to the Spec emitting it
928 std::map<size_t, size_t> outputToSpec;
929
930 std::ostringstream ss;
931
932 for (auto& spec : workflow) {
933 if (spec.name.empty()) {
934 throw std::runtime_error("Invalid DataProcessorSpec name");
935 }
936 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
937 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
938 }
939 if (validNames.find(spec.name) != validNames.end()) {
940 throw std::runtime_error("Name " + spec.name + " is used twice.");
941 }
942 validNames.insert(spec.name);
943 for (auto& option : spec.options) {
944 if (option.defaultValue.type() != VariantType::Empty &&
945 option.type != option.defaultValue.type()) {
946 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
947 << ") for " << option.name << " in DataProcessorSpec of "
948 << spec.name;
949 throw std::runtime_error(ss.str());
950 }
951 }
952 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
953 InputSpec const& input = spec.inputs[ii];
954 if (DataSpecUtils::validate(input) == false) {
955 ss << "In spec " << spec.name << " input specification "
956 << ii << " requires binding, description and origin"
957 " to be fully specified (found "
958 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
959 throw std::runtime_error(ss.str());
960 }
961 }
962 }
964}
965
966using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
969 size_t id;
970};
971
972std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
973{
974 // compute total number of input/output
975 size_t totalInputs = 0;
976 size_t totalOutputs = 0;
977 for (auto& spec : workflow) {
978 totalInputs += spec.inputs.size();
979 totalOutputs += spec.outputs.size();
980 }
981
982 std::vector<DataMatcherId> inputs;
983 std::vector<DataMatcherId> outputs;
984 inputs.reserve(totalInputs);
985 outputs.reserve(totalOutputs);
986
987 std::vector<InputSpec> results;
988 std::vector<bool> isDangling;
989 results.reserve(totalOutputs);
990 isDangling.reserve(totalOutputs);
991
993 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
994 auto& spec = workflow[wi];
995 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
996 inputs.emplace_back(DataMatcherId{wi, ii});
997 }
998 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
999 outputs.emplace_back(DataMatcherId{wi, oi});
1000 }
1001 }
1002
1003 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1004 auto& output = outputs[oi];
1005 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1006
1007 // is dangling output?
1008 bool matched = false;
1009 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1010 auto& input = inputs[ii];
1011 // Inputs of the same workflow cannot match outputs
1012 if (output.workflowId == input.workflowId) {
1013 continue;
1014 }
1015 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1016 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1017 matched = true;
1018 break;
1019 }
1020 }
1021
1022 auto input = DataSpecUtils::matchingInput(outputSpec);
1023 char buf[64];
1024 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1025
1026 // make sure that entries are unique
1027 if (std::find(results.begin(), results.end(), input) == results.end()) {
1028 results.emplace_back(input);
1029 isDangling.emplace_back(matched == false);
1030 }
1031 }
1032
1033 // make sure that results is unique
1034 return std::make_tuple(results, isDangling);
1035}
1036
1037std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1038{
1039
1040 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1041
1042 std::vector<InputSpec> results;
1043 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1044 if (isDangling[ii]) {
1045 results.emplace_back(outputsInputs[ii]);
1046 }
1047 }
1048
1049 return results;
1050}
1051
1052bool validateLifetime(std::ostream& errors,
1053 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1054 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1055{
1056 // In case the completion policy is consume-any, we do not need to check anything.
1057 if (consumerPolicies.completionPolicyName == "consume-any") {
1058 return true;
1059 }
1060 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1061 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1062 DataSpecUtils::describe(input).c_str(), consumer.name,
1063 DataSpecUtils::describe(output).c_str(), producer.name);
1064 return false;
1065 }
1066 return true;
1067}
1068
1069bool validateExpendable(std::ostream& errors,
1070 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1071 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1072{
1073 auto isExpendable = [](DataProcessorLabel const& label) {
1074 return label.value == "expendable";
1075 };
1076 auto isResilient = [](DataProcessorLabel const& label) {
1077 return label.value == "expendable" || label.value == "resilient";
1078 };
1079 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1080 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1081 if (producerExpendable && consumerCritical) {
1082 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1083 consumer.name,
1084 producer.name);
1085 return false;
1086 }
1087 return true;
1088}
1089
1090using Validator = std::function<bool(std::ostream& errors,
1091 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1092 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1093
1095 std::vector<DataProcessorPoliciesInfo> const& policies,
1096 std::vector<DeviceConnectionEdge> const& edges,
1097 std::vector<OutputSpec> const& outputs)
1098{
1099 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1100 std::vector<Validator> defaultValidators = {validateExpendable};
1101 if (!disableLifetimeCheck) {
1102 defaultValidators.emplace_back(validateLifetime);
1103 }
1104 std::stringstream errors;
1105 // Iterate over all the edges.
1106 // Get the input lifetime and the output lifetime.
1107 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1108 bool hasErrors = false;
1109 for (auto& edge : edges) {
1110 DataProcessorSpec const& producer = workflow[edge.producer];
1111 DataProcessorSpec const& consumer = workflow[edge.consumer];
1112 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1113 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1114 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1115 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1116 for (auto& validator : defaultValidators) {
1117 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1118 }
1119 }
1120 if (hasErrors) {
1121 throw std::runtime_error(errors.str());
1122 }
1123}
1124
1125} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
o2::mch::DsIndex ds
const std::string str