Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
29#include "Framework/Signpost.h"
31
32#include "Framework/Variant.h"
33#include "Headers/DataHeader.h"
34#include <algorithm>
35#include <list>
36#include <set>
37#include <utility>
38#include <vector>
39#include <climits>
40#include <numeric>
41
42O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
43
44namespace o2::framework
45{
46std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
47{
48 out << "(" << info.index << ", " << info.layer << ")";
49 return out;
50}
51
52std::vector<TopoIndexInfo>
54 int const* edgeIn,
55 int const* edgeOut,
56 size_t byteStride,
57 size_t edgesCount)
58{
59 size_t stride = byteStride / sizeof(int);
60 using EdgeIndex = int;
61 // Create the index which will be returned.
62 std::vector<TopoIndexInfo> index(nodeCount);
63 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
64 index[wi] = {wi, 0};
65 }
66 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
67 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
68 remainingEdgesIndex[ei] = ei;
69 }
70
71 // Create a vector where at each position we have true
72 // if the vector has dependencies, false otherwise
73 std::vector<bool> nodeDeps(nodeCount, false);
74 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
75 nodeDeps[*(edgeOut + ei * stride)] = true;
76 }
77
78 // We start with all those which do not have any dependencies
79 // They are layer 0.
80 std::list<TopoIndexInfo> L;
81 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
82 if (nodeDeps[ii] == false) {
83 L.push_back({ii, 0});
84 }
85 }
86
87 // The final result.
88 std::vector<TopoIndexInfo> S;
89 // The set of vertices which can be reached by the current node
90 std::set<TopoIndexInfo> nextVertex;
91 // The set of edges which are not related to the current node.
92 std::vector<EdgeIndex> nextEdges;
93 while (!L.empty()) {
94 auto node = L.front();
95 S.push_back(node);
96 L.pop_front();
97 nextVertex.clear();
98 nextEdges.clear();
99
100 // After this, nextVertex will contain all the vertices
101 // which have the current node as incoming.
102 // nextEdges will contain all the edges which are not related
103 // to the current node.
104 for (auto& ei : remainingEdgesIndex) {
105 if (*(edgeIn + ei * stride) == node.index) {
106 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
107 } else {
108 nextEdges.push_back(ei);
109 }
110 }
111 remainingEdgesIndex.swap(nextEdges);
112
113 // Of all the vertices which have node as incoming,
114 // check if there is any other incoming node.
115 std::set<TopoIndexInfo> hasPredecessors;
116 for (auto& ei : remainingEdgesIndex) {
117 for (auto& m : nextVertex) {
118 if (m.index == *(edgeOut + ei * stride)) {
119 hasPredecessors.insert({m.index, m.layer});
120 }
121 }
122 }
123 std::vector<TopoIndexInfo> withPredecessor;
124 std::set_difference(nextVertex.begin(), nextVertex.end(),
125 hasPredecessors.begin(), hasPredecessors.end(),
126 std::back_inserter(withPredecessor));
127 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
128 }
129 return S;
130}
131
132// get the default value for condition-backend
134{
135 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
136 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
137 if (explicitBackend) {
138 return getenv("DPL_CONDITION_BACKEND");
139 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
140 return "http://o2-ccdb.internal";
141 } else {
142 return "http://alice-ccdb.cern.ch";
143 }
144}
145
146// get the default value for condition query rate
148{
149 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
150}
151
152// get the default value for condition query rate multiplier
154{
155 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
156}
157
159{
160 auto fakeCallback = AlgorithmSpec{[](InitContext& ic) {
161 LOG(info) << "This is not a real device, merely a placeholder for external inputs";
162 LOG(info) << "To be hidden / removed at some point.";
163 // mark this dummy process as ready-to-quit
164 ic.services().get<ControlService>().readyToQuit(QuitRequest::Me);
165
166 return [](ProcessingContext& pc) {
167 // this callback is never called since there is no expiring input
168 pc.services().get<RawDeviceService>().waitFor(2000);
169 };
170 }};
171
172 DataProcessorSpec ccdbBackend{
173 .name = "internal-dpl-ccdb-backend",
174 .outputs = {},
175 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
176 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
177 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
178 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
179 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
180 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
181 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
182 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
183 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
184 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
185 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
186 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
187 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
188 };
189 DataProcessorSpec analysisCCDBBackend{
190 .name = "internal-dpl-aod-ccdb",
191 .inputs = {},
192 .outputs = {},
193 .algorithm = AlgorithmSpec::dummyAlgorithm(),
194 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
195 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
196 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
197 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
198 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
199 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
200 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
201 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
202 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
203 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
204 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
205 DataProcessorSpec transientStore{"internal-dpl-transient-store",
206 {},
207 {},
209 DataProcessorSpec qaStore{"internal-dpl-qa-store",
210 {},
211 {},
213 DataProcessorSpec timer{"internal-dpl-clock",
214 {},
215 {},
217
218 // In case InputSpec of origin AOD are
219 // requested but not available as part of the workflow,
220 // we insert in the configuration something which
221 // reads them from file.
222 //
223 // FIXME: source branch is DataOrigin, for the moment. We should
224 // make it configurable via ConfigParamsOptions
225 auto aodLifetime = Lifetime::Enumeration;
226
227 DataProcessorSpec aodReader{
228 .name = "internal-dpl-aod-reader",
229 .inputs = {InputSpec{"enumeration",
230 "DPL",
231 "ENUM",
232 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
233 aodLifetime}},
234 .algorithm = AlgorithmSpec::dummyAlgorithm(),
235 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
236 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
237 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
238 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
239 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
240 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
241 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
242 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
243 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
244 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
245
246 // AOD reader can be rate limited
247 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
248 std::string rateLimitingChannelConfigInput;
249 std::string rateLimitingChannelConfigOutput;
250 bool internalRateLimiting = false;
251
252 // In case we have rate-limiting requested, any device without an input will get one on the special
253 // "DPL/RATE" message.
254 if (rateLimitingIPCID >= 0) {
255 rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
256 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
257 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
258 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
259 internalRateLimiting = true;
260 aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
261 }
262
263 ctx.services().registerService(ServiceRegistryHelpers::handleForService<AnalysisContext>(new AnalysisContext));
264 auto& ac = ctx.services().get<AnalysisContext>();
265
266 std::vector<InputSpec> requestedCCDBs;
267 std::vector<OutputSpec> providedCCDBs;
268
269 for (size_t wi = 0; wi < workflow.size(); ++wi) {
270 auto& processor = workflow[wi];
271 auto name = processor.name;
272 auto hash = runtime_hash(name.c_str());
273 ac.outTskMap.push_back({hash, name});
274
275 std::string prefix = "internal-dpl-";
276 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
277 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
278 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
279 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
280 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
281 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
282 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
283 }
284 bool hasTimeframeInputs = false;
285 for (auto& input : processor.inputs) {
286 if (input.lifetime == Lifetime::Timeframe) {
287 hasTimeframeInputs = true;
288 break;
289 }
290 }
291 bool hasTimeframeOutputs = false;
292 for (auto& output : processor.outputs) {
293 if (output.lifetime == Lifetime::Timeframe) {
294 hasTimeframeOutputs = true;
295 break;
296 }
297 }
298 // A timeframeSink consumes timeframes without creating new
299 // timeframe data.
300 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
301 if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
302 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
303 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
304 uint32_t hash = runtime_hash(processor.name.c_str());
305 bool hasMatch = false;
306 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
307 for (auto& output : processor.outputs) {
308 if (DataSpecUtils::match(output, summaryMatcher)) {
309 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
310 DataSpecUtils::describe(output).c_str(), processor.name.c_str());
311 hasMatch = true;
312 break;
313 }
314 }
315 if (!hasMatch) {
316 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
317 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
318 }
319 }
320 }
321 bool hasConditionOption = false;
322 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
323 auto& input = processor.inputs[ii];
324 switch (input.lifetime) {
325 case Lifetime::Timer: {
326 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
327 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
328 if (hasOption == false) {
329 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
330 }
331 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
332 } break;
333 case Lifetime::Signal: {
334 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
335 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
336 } break;
337 case Lifetime::Enumeration: {
338 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
339 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
340 } break;
341 case Lifetime::Condition: {
342 for (auto& option : processor.options) {
343 if (option.name == "condition-backend") {
344 hasConditionOption = true;
345 break;
346 }
347 }
348 if (hasConditionOption == false) {
349 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
350 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
351 hasConditionOption = true;
352 }
353 requestedCCDBs.emplace_back(input);
354 } break;
355 case Lifetime::OutOfBand: {
356 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
357 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
358 if (hasOption == false) {
359 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
360 }
361 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
362 } break;
363 case Lifetime::QA:
364 case Lifetime::Transient:
365 case Lifetime::Timeframe:
366 case Lifetime::Optional:
367 break;
368 }
369 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
370 DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input});
371 }
373 DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input});
374 }
376 DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input});
377 }
379 DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input});
380 }
381 }
382
383 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
384
385 for (auto& output : processor.outputs) {
386 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
387 ac.providedAODs.emplace_back(output);
389 ac.providedDYNs.emplace_back(output);
391 ac.providedTIMs.emplace_back(output);
393 ac.providedOutputObjHist.emplace_back(output);
394 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
395 if (it == ac.outObjHistMap.end()) {
396 ac.outObjHistMap.push_back({hash, {output.binding.value}});
397 } else {
398 it->bindings.push_back(output.binding.value);
399 }
400 }
401 if (output.lifetime == Lifetime::Condition) {
402 providedCCDBs.push_back(output);
403 }
404 }
405 }
406
407 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
408 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
409 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
410 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
411 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
412 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
413
414 DataProcessorSpec indexBuilder{
415 "internal-dpl-aod-index-builder",
416 {},
417 {},
418 PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx), // readers::AODReaderHelpers::indexBuilderCallback(ctx),
419 {}};
420 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder);
421
422 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
424 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
425 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend);
426 }
427
428 for (auto& input : ac.requestedDYNs) {
429 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
430 ac.spawnerInputs.emplace_back(input);
431 }
432 }
433
434 DataProcessorSpec aodSpawner{
435 "internal-dpl-aod-spawner",
436 {},
437 {},
438 PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx), // readers::AODReaderHelpers::aodSpawnerCallback(ctx),
439 {}};
440 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner);
441
442 AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader);
443 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
444
445 std::vector<DataProcessorSpec> extraSpecs;
446
447 if (transientStore.outputs.empty() == false) {
448 extraSpecs.push_back(transientStore);
449 }
450 if (qaStore.outputs.empty() == false) {
451 extraSpecs.push_back(qaStore);
452 }
453
454 if (aodSpawner.outputs.empty() == false) {
455 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
456 }
457
458 if (indexBuilder.outputs.empty() == false) {
459 extraSpecs.push_back(indexBuilder);
460 }
461
462 // add the reader
463 if (aodReader.outputs.empty() == false) {
464 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
465 if (mctracks2aod == workflow.end()) {
466 // add normal reader
467 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
468 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
469 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
470 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
471 } else {
472 // AODs are being injected on-the-fly, add dummy reader
473 auto algo = AlgorithmSpec{
475 [outputs = aodReader.outputs](DeviceSpec const&) {
476 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
477 for (auto const& output : outputs) {
478 LOGP(warn, " {}", DataSpecUtils::describe(output));
479 }
480 LOGP(fatal, "Stopping.");
481 // to ensure the output type for adaptStateful
482 return adaptStateless([](DataAllocator&) {});
483 })};
484 aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo);
485 }
486 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
487 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
488 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
489 }
490
491 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
492 auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
493 // Check if any of the provided outputs is a DISTSTF
494 // Check if any of the requested inputs is for a 0xccdb message
495 bool providesDISTSTF = std::any_of(workflow.begin(), workflow.end(),
496 [&matcher](auto const& dp) {
497 return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
498 return DataSpecUtils::match(matcher, output);
499 });
500 });
501
502 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
503 // we add to the first data processor which has no inputs (apart from
504 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
505 bool requiresDISTSUBTIMEFRAME = std::any_of(workflow.begin(), workflow.end(),
506 [&dstf](auto const& dp) {
507 return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
508 return DataSpecUtils::match(input, dstf);
509 });
510 });
511
512 // We find the first device which has either just enumerations or
513 // just timers, and we will add the DISTSUBTIMEFRAME to it.
514 // Notice how we do so in a stable manner by sorting the devices
515 // by name.
516 int enumCandidate = -1;
517 int timerCandidate = -1;
518 for (auto wi = 0U; wi < workflow.size(); ++wi) {
519 auto& dp = workflow[wi];
520 if (dp.inputs.size() != 1) {
521 continue;
522 }
523 auto lifetime = dp.inputs[0].lifetime;
524 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
525 enumCandidate = wi;
526 }
527 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
528 timerCandidate = wi;
529 }
530 }
531
532 // * If there are AOD outputs we use TFNumber as the CCDB clock
533 // * If one device provides a DISTSTF we use that as the CCDB clock
534 // * If one of the devices provides a timer we use that as the CCDB clock
535 // * If none of the above apply, add to the first data processor
536 // which has no inputs apart from enumerations the responsibility
537 // to provide the DISTSUBTIMEFRAME.
538 if (ccdbBackend.outputs.empty() == false) {
539 if (aodReader.outputs.empty() == false) {
540 // fetcher clock follows AOD source (TFNumber)
541 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
542 } else if (providesDISTSTF) {
543 // fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
544 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
545 } else {
546 if (enumCandidate != -1) {
547 // add DSTF/ccdb source to the enumeration-driven source explicitly
548 // fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
549 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
550 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
551 } else if (timerCandidate != -1) {
552 // fetcher clock is proived by timer source
553 auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
554 ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
555 }
556 }
557
558 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
559 // Load the CCDB backend from the plugin
560 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
561 extraSpecs.push_back(ccdbBackend);
562 } else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
563 // add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
564 DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
565 }
566
567 // add the Analysys CCDB backend which reads CCDB objects using a provided table
568 if (analysisCCDBBackend.outputs.empty() == false) {
569 // add normal reader
570 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
571 analysisCCDBBackend.algorithm = algo;
572 extraSpecs.push_back(analysisCCDBBackend);
573 }
574
575 // add the timer
576 if (timer.outputs.empty() == false) {
577 extraSpecs.push_back(timer);
578 }
579
580 // This is to inject a file sink so that any dangling ATSK object is written
581 // to a ROOT file.
582 if (ac.providedOutputObjHist.empty() == false) {
584 extraSpecs.push_back(rootSink);
585 }
586
587 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
588 extraSpecs.clear();
589
591 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
592 ac.isDangling = isDanglingTmp;
593 ac.outputsInputs = outputsInputsTmp;
594
595 // create DataOutputDescriptor
596 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
597
598 // select outputs of type AOD which need to be saved
599 // ATTENTION: if there are dangling outputs the getGlobalAODSink
600 // has to be created in any case!
601 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
602 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
603 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
604 if (ds.size() > 0 || ac.isDangling[ii]) {
605 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
606 }
607 }
608 }
609
610 // file sink for any AOD output
611 if (ac.outputsInputsAOD.size() > 0) {
612 // add TFNumber and TFFilename as input to the writer
613 ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
614 ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
615 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
616 extraSpecs.push_back(fileSink);
617
618 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool {
619 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
620 });
621 size_t ii = std::distance(ac.outputsInputs.begin(), it);
622 ac.isDangling[ii] = false;
623 }
624
625 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
626 extraSpecs.clear();
627
628 // Select dangling outputs which are not of type AOD
629 std::vector<InputSpec> redirectedOutputsInputs;
630 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
631 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
632 continue;
633 }
634 // We forward to the output proxy all the inputs only if they are dangling
635 // or if the forwarding policy is "proxy".
636 if (!ac.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
637 continue;
638 }
639 // AODs are skipped in any case.
640 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
641 continue;
642 }
643 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
644 }
645
646 std::vector<InputSpec> unmatched;
647 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
648 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
649 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
650 if (unmatched.size() != redirectedOutputsInputs.size()) {
651 extraSpecs.push_back(fileSink);
652 }
653 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
654 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
655 extraSpecs.push_back(fairMQSink);
656 } else if (forwardingDestination != "drop") {
657 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
658 }
659 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
660 std::vector<InputSpec> ignored = unmatched;
661 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
662 for (auto& ignoredInput : ignored) {
663 ignoredInput.lifetime = Lifetime::Sporadic;
664 }
665
666 // Use the new dummy sink when the AOD reader is there
667 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
668 if (aodReader.outputs.empty() == false) {
669 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting scheduled dummy sink");
670 extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored));
671 } else {
672 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink");
673 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
674 }
675 }
676
677 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
678 extraSpecs.clear();
679}
680
682{
683 unsigned int distSTFCount = 0;
684 for (auto& spec : workflow) {
685 auto& inputs = spec.inputs;
686 bool allSporadic = true;
687 bool hasTimer = false;
688 bool hasSporadic = false;
689 bool hasOptionals = false;
690 for (auto& input : inputs) {
691 if (input.lifetime == Lifetime::Optional) {
692 hasOptionals = true;
693 }
694 }
695 for (auto& input : inputs) {
696 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
697 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
698 // have Optional inputs as well.
699 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
700 // forwarded before actual RAWDATA arrives.
701 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
702 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
703 LOGP(error,
704 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
705 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
706 DataSpecUtils::describe(input), input.binding);
707 }
708 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
709 // The first one remains unchanged, therefore we use the postincrement
710 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
711 continue;
712 }
713 // Timers are sporadic only when they are not
714 // alone.
715 if (input.lifetime == Lifetime::Timer) {
716 hasTimer = true;
717 continue;
718 }
719 if (input.lifetime == Lifetime::Sporadic) {
720 hasSporadic = true;
721 } else {
722 allSporadic = false;
723 }
724 }
725
726 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
727
728 // If they are not all sporadic (excluding timers)
729 // we leave things as they are.
730 if (allSporadic == false) {
731 continue;
732 }
733 // A timer alone is not sporadic.
734 if (hasSporadic == false) {
735 continue;
736 }
740 for (auto& output : spec.outputs) {
741 if (output.lifetime == Lifetime::Timeframe) {
742 output.lifetime = Lifetime::Sporadic;
743 }
744 }
745 }
746
747 if (distSTFCount > 0) {
748 bool found = false;
749 for (auto& spec : workflow) {
750 for (auto& output : spec.outputs) {
751 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
752 found = true;
753 break;
754 }
755 }
756 if (found) {
757 for (unsigned int i = 1; i < distSTFCount; ++i) {
758 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
759 }
760 break;
761 }
762 }
763 }
764}
765
767 std::vector<DeviceConnectionEdge>& logicalEdges,
768 std::vector<OutputSpec>& outputs,
769 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
770{
771 // In case the workflow is empty, we do not have anything to do.
772 if (workflow.empty()) {
773 return;
774 }
775
776 // This is the state. Oif is the iterator I use for the searches.
777 std::vector<LogicalOutputInfo> availableOutputsInfo;
778 auto const& constOutputs = outputs; // const version of the outputs
779 // Forwards is a local cache to avoid adding forwards before time.
780 std::vector<LogicalOutputInfo> forwards;
781
782 // Notice that availableOutputsInfo MUST be updated first, since it relies on
783 // the size of outputs to be the one before the update.
784 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
785 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
786 for (size_t wi = 0; wi < workflow.size(); ++wi) {
787 auto& producer = workflow[wi];
788 if (producer.outputs.empty()) {
789 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
790 }
791 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
792
793 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
794 auto& out = producer.outputs[oi];
795 auto uniqueOutputId = outputs.size();
796 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
797 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
798 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
799 outputs.push_back(out);
800 }
801 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
802 }
803 };
804
805 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
806 auto input = workflow[ci].inputs[ii];
807 std::ostringstream str;
808 str << "No matching output found for "
809 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
810
811 for (auto& output : constOutputs) {
812 str << "-" << DataSpecUtils::describe(output) << "\n";
813 }
814
815 throw std::runtime_error(str.str());
816 };
817
818 // This is the outer loop
819 //
820 // Here we iterate over dataprocessor items in workflow and we consider them
821 // as consumer, since we are interested in their inputs.
822 // Notice also we need to search for all the matching inputs, since
823 // we could have more than one source that matches (e.g. in the
824 // case of a time merger).
825 // Once consumed, an output is not actually used anymore, however
826 // we append it as a forward.
827 // Finally, If a device has n-way pipelining, we need to create one node per
828 // parallel pipeline and add an edge for each.
829 enumerateAvailableOutputs();
830
831 std::vector<bool> matches(constOutputs.size());
832 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
833 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
834 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
835 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
836 forwards.clear();
837 for (size_t i = 0; i < constOutputs.size(); i++) {
838 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
839 if (matches[i]) {
840 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
841 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
842 DataSpecUtils::describe(constOutputs[i]).c_str());
843 }
844 }
845
846 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
847 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
848 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
849 continue;
850 }
851 auto* oif = &availableOutputsInfo[i];
852 if (oif->forward) {
853 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
854 }
855 auto producer = oif->specIndex;
856 auto uniqueOutputId = oif->outputGlobalIndex;
857 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
858 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
859 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
860 workflow[producer].name.c_str());
861 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
862 }
863 }
864 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
865 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
866 oif->enabled = false;
867 }
868 if (forwards.empty()) {
869 errorDueToMissingOutputFor(consumer, input);
870 }
871 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
872 for (auto& forward : forwards) {
873 availableOutputsInfo.push_back(forward);
874 }
875 }
876 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
877 }
878}
879
880std::vector<EdgeAction>
882 const std::vector<DeviceConnectionEdge>& edges,
883 const std::vector<size_t>& index)
884{
885 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
886
887 assert(edges.size() == index.size());
888 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
889 for (size_t i : index) {
890 auto& edge = edges[i];
891 auto& action = actions[i];
892 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
893 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
894 last = edge;
895 }
896 return actions;
897}
898
899std::vector<EdgeAction>
901 const std::vector<DeviceConnectionEdge>& edges,
902 const std::vector<size_t>& index)
903{
904 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
905
906 assert(edges.size() == index.size());
907 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
908 for (size_t i : index) {
909 auto& edge = edges[i];
910 auto& action = actions[i];
911 // Calculate which actions need to be taken for this edge.
912 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
913 action.requiresNewChannel =
914 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
915
916 last = edge;
917 }
918 return actions;
919}
920
921void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
922 std::vector<size_t>& outEdgeIndex,
923 const std::vector<DeviceConnectionEdge>& edges)
924{
925 inEdgeIndex.resize(edges.size());
926 outEdgeIndex.resize(edges.size());
927 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
928 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
929
930 // Two indexes, one to bind the outputs, the other
931 // one to connect the inputs. The
932 auto outSorter = [&edges](size_t i, size_t j) {
933 auto& a = edges[i];
934 auto& b = edges[j];
935 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
936 };
937 auto inSorter = [&edges](size_t i, size_t j) {
938 auto& a = edges[i];
939 auto& b = edges[j];
940 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
941 };
942
943 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
944 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
945}
946
948{
949 if (workflow.empty()) {
951 }
952 std::set<std::string> validNames;
953 std::vector<OutputSpec> availableOutputs;
954 std::vector<InputSpec> requiredInputs;
955
956 // An index many to one index to go from a given input to the
957 // associated spec
958 std::map<size_t, size_t> inputToSpec;
959 // A one to one index to go from a given output to the Spec emitting it
960 std::map<size_t, size_t> outputToSpec;
961
962 std::ostringstream ss;
963
964 for (auto& spec : workflow) {
965 if (spec.name.empty()) {
966 throw std::runtime_error("Invalid DataProcessorSpec name");
967 }
968 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
969 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
970 }
971 if (validNames.find(spec.name) != validNames.end()) {
972 throw std::runtime_error("Name " + spec.name + " is used twice.");
973 }
974 validNames.insert(spec.name);
975 for (auto& option : spec.options) {
976 if (option.defaultValue.type() != VariantType::Empty &&
977 option.type != option.defaultValue.type()) {
978 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
979 << ") for " << option.name << " in DataProcessorSpec of "
980 << spec.name;
981 throw std::runtime_error(ss.str());
982 }
983 }
984 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
985 InputSpec const& input = spec.inputs[ii];
986 if (DataSpecUtils::validate(input) == false) {
987 ss << "In spec " << spec.name << " input specification "
988 << ii << " requires binding, description and origin"
989 " to be fully specified (found "
990 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
991 throw std::runtime_error(ss.str());
992 }
993 }
994 }
996}
997
998using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
1001 size_t id;
1002};
1003
1004std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
1005{
1006 // compute total number of input/output
1007 size_t totalInputs = 0;
1008 size_t totalOutputs = 0;
1009 for (auto& spec : workflow) {
1010 totalInputs += spec.inputs.size();
1011 totalOutputs += spec.outputs.size();
1012 }
1013
1014 std::vector<DataMatcherId> inputs;
1015 std::vector<DataMatcherId> outputs;
1016 inputs.reserve(totalInputs);
1017 outputs.reserve(totalOutputs);
1018
1019 std::vector<InputSpec> results;
1020 std::vector<bool> isDangling;
1021 results.reserve(totalOutputs);
1022 isDangling.reserve(totalOutputs);
1023
1025 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1026 auto& spec = workflow[wi];
1027 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1028 inputs.emplace_back(DataMatcherId{wi, ii});
1029 }
1030 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1031 outputs.emplace_back(DataMatcherId{wi, oi});
1032 }
1033 }
1034
1035 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1036 auto& output = outputs[oi];
1037 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1038
1039 // is dangling output?
1040 bool matched = false;
1041 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1042 auto& input = inputs[ii];
1043 // Inputs of the same workflow cannot match outputs
1044 if (output.workflowId == input.workflowId) {
1045 continue;
1046 }
1047 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1048 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1049 matched = true;
1050 break;
1051 }
1052 }
1053
1054 auto input = DataSpecUtils::matchingInput(outputSpec);
1055 char buf[64];
1056 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1057
1058 // make sure that entries are unique
1059 if (std::find(results.begin(), results.end(), input) == results.end()) {
1060 results.emplace_back(input);
1061 isDangling.emplace_back(matched == false);
1062 }
1063 }
1064
1065 // make sure that results is unique
1066 return std::make_tuple(results, isDangling);
1067}
1068
1069std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1070{
1071
1072 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1073
1074 std::vector<InputSpec> results;
1075 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1076 if (isDangling[ii]) {
1077 results.emplace_back(outputsInputs[ii]);
1078 }
1079 }
1080
1081 return results;
1082}
1083
1084bool validateLifetime(std::ostream& errors,
1085 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1086 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1087{
1088 // In case the completion policy is consume-any, we do not need to check anything.
1089 if (consumerPolicies.completionPolicyName == "consume-any") {
1090 return true;
1091 }
1092 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1093 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1094 DataSpecUtils::describe(input).c_str(), consumer.name,
1095 DataSpecUtils::describe(output).c_str(), producer.name);
1096 return false;
1097 }
1098 return true;
1099}
1100
1101bool validateExpendable(std::ostream& errors,
1102 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1103 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1104{
1105 auto isExpendable = [](DataProcessorLabel const& label) {
1106 return label.value == "expendable";
1107 };
1108 auto isResilient = [](DataProcessorLabel const& label) {
1109 return label.value == "expendable" || label.value == "resilient";
1110 };
1111 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1112 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1113 if (producerExpendable && consumerCritical) {
1114 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1115 consumer.name,
1116 producer.name);
1117 return false;
1118 }
1119 return true;
1120}
1121
1122using Validator = std::function<bool(std::ostream& errors,
1123 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1124 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1125
1127 std::vector<DataProcessorPoliciesInfo> const& policies,
1128 std::vector<DeviceConnectionEdge> const& edges,
1129 std::vector<OutputSpec> const& outputs)
1130{
1131 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1132 std::vector<Validator> defaultValidators = {validateExpendable};
1133 if (!disableLifetimeCheck) {
1134 defaultValidators.emplace_back(validateLifetime);
1135 }
1136 std::stringstream errors;
1137 // Iterate over all the edges.
1138 // Get the input lifetime and the output lifetime.
1139 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1140 bool hasErrors = false;
1141 for (auto& edge : edges) {
1142 DataProcessorSpec const& producer = workflow[edge.producer];
1143 DataProcessorSpec const& consumer = workflow[edge.consumer];
1144 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1145 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1146 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1147 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1148 for (auto& validator : defaultValidators) {
1149 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1150 }
1151 }
1152 if (hasErrors) {
1153 throw std::runtime_error(errors.str());
1154 }
1155}
1156
1157} // namespace o2::framework
uint32_t hash
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str