Project
Loading...
Searching...
No Matches
WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WorkflowHelpers.h"
30#include "Framework/Signpost.h"
32
33#include "Framework/Variant.h"
34#include "Headers/DataHeader.h"
35#include <algorithm>
36#include <list>
37#include <set>
38#include <utility>
39#include <vector>
40#include <climits>
41
42O2_DECLARE_DYNAMIC_LOG(workflow_helpers);
43
44namespace o2::framework
45{
46std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info)
47{
48 out << "(" << info.index << ", " << info.layer << ")";
49 return out;
50}
51
52std::vector<TopoIndexInfo>
54 int const* edgeIn,
55 int const* edgeOut,
56 size_t byteStride,
57 size_t edgesCount)
58{
59 size_t stride = byteStride / sizeof(int);
60 using EdgeIndex = int;
61 // Create the index which will be returned.
62 std::vector<TopoIndexInfo> index(nodeCount);
63 for (auto wi = 0; static_cast<size_t>(wi) < nodeCount; ++wi) {
64 index[wi] = {wi, 0};
65 }
66 std::vector<EdgeIndex> remainingEdgesIndex(edgesCount);
67 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
68 remainingEdgesIndex[ei] = ei;
69 }
70
71 // Create a vector where at each position we have true
72 // if the vector has dependencies, false otherwise
73 std::vector<bool> nodeDeps(nodeCount, false);
74 for (EdgeIndex ei = 0; static_cast<size_t>(ei) < edgesCount; ++ei) {
75 nodeDeps[*(edgeOut + ei * stride)] = true;
76 }
77
78 // We start with all those which do not have any dependencies
79 // They are layer 0.
80 std::list<TopoIndexInfo> L;
81 for (auto ii = 0; static_cast<size_t>(ii) < index.size(); ++ii) {
82 if (nodeDeps[ii] == false) {
83 L.push_back({ii, 0});
84 }
85 }
86
87 // The final result.
88 std::vector<TopoIndexInfo> S;
89 // The set of vertices which can be reached by the current node
90 std::set<TopoIndexInfo> nextVertex;
91 // The set of edges which are not related to the current node.
92 std::vector<EdgeIndex> nextEdges;
93 while (!L.empty()) {
94 auto node = L.front();
95 S.push_back(node);
96 L.pop_front();
97 nextVertex.clear();
98 nextEdges.clear();
99
100 // After this, nextVertex will contain all the vertices
101 // which have the current node as incoming.
102 // nextEdges will contain all the edges which are not related
103 // to the current node.
104 for (auto& ei : remainingEdgesIndex) {
105 if (*(edgeIn + ei * stride) == node.index) {
106 nextVertex.insert({*(edgeOut + ei * stride), node.layer + 1});
107 } else {
108 nextEdges.push_back(ei);
109 }
110 }
111 remainingEdgesIndex.swap(nextEdges);
112
113 // Of all the vertices which have node as incoming,
114 // check if there is any other incoming node.
115 std::set<TopoIndexInfo> hasPredecessors;
116 for (auto& ei : remainingEdgesIndex) {
117 for (auto& m : nextVertex) {
118 if (m.index == *(edgeOut + ei * stride)) {
119 hasPredecessors.insert({m.index, m.layer});
120 }
121 }
122 }
123 std::vector<TopoIndexInfo> withPredecessor;
124 std::set_difference(nextVertex.begin(), nextVertex.end(),
125 hasPredecessors.begin(), hasPredecessors.end(),
126 std::back_inserter(withPredecessor));
127 std::copy(withPredecessor.begin(), withPredecessor.end(), std::back_inserter(L));
128 }
129 return S;
130}
131
132// get the default value for condition-backend
134{
135 static bool explicitBackend = getenv("DPL_CONDITION_BACKEND");
136 static DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
137 if (explicitBackend) {
138 return getenv("DPL_CONDITION_BACKEND");
139 } else if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS) {
140 return "http://o2-ccdb.internal";
141 } else {
142 return "http://alice-ccdb.cern.ch";
143 }
144}
145
146// get the default value for condition query rate
148{
149 return getenv("DPL_CONDITION_QUERY_RATE") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE")) : 0;
150}
151
152// get the default value for condition query rate multiplier
154{
155 return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1;
156}
157
159{
160 auto fakeCallback = AlgorithmSpec{[](InitContext& ic) {
161 LOG(info) << "This is not a real device, merely a placeholder for external inputs";
162 LOG(info) << "To be hidden / removed at some point.";
163 // mark this dummy process as ready-to-quit
164 ic.services().get<ControlService>().readyToQuit(QuitRequest::Me);
165
166 return [](ProcessingContext& pc) {
167 // this callback is never called since there is no expiring input
168 pc.services().get<RawDeviceService>().waitFor(2000);
169 };
170 }};
171
172 DataProcessorSpec ccdbBackend{
173 .name = "internal-dpl-ccdb-backend",
174 .outputs = {},
175 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
176 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
177 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
178 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
179 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
180 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
181 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
182 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
183 {"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
184 {"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
185 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
186 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
187 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
188 };
189 DataProcessorSpec analysisCCDBBackend{
190 .name = "internal-dpl-aod-ccdb",
191 .inputs = {},
192 .outputs = {},
193 .algorithm = AlgorithmSpec::dummyAlgorithm(),
194 .options = {{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}},
195 {"condition-not-before", VariantType::Int64, 0ll, {"do not fetch from CCDB objects created before provide timestamp"}},
196 {"condition-not-after", VariantType::Int64, 3385078236000ll, {"do not fetch from CCDB objects created after the timestamp"}},
197 {"condition-remap", VariantType::String, "", {"remap condition path in CCDB based on the provided string."}},
198 {"condition-tf-per-query", VariantType::Int, defaultConditionQueryRate(), {"check condition validity per requested number of TFs, fetch only once if <=0"}},
199 {"condition-tf-per-query-multiplier", VariantType::Int, defaultConditionQueryRateMultiplier(), {"check conditions once per this amount of nominal checks (>0) or on module of TFcounter (<0)"}},
200 {"condition-use-slice-for-prescaling", VariantType::Int, 0, {"use TFslice instead of TFcounter to control validation frequency. If > query rate, do not allow TFCounter excursion exceeding it"}},
201 {"condition-time-tolerance", VariantType::Int64, 5000ll, {"prefer creation time if its difference to orbit-derived time exceeds threshold (ms), impose if <0"}},
202 {"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
203 {"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
204 {"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};
205 DataProcessorSpec transientStore{"internal-dpl-transient-store",
206 {},
207 {},
209 DataProcessorSpec qaStore{"internal-dpl-qa-store",
210 {},
211 {},
213 DataProcessorSpec timer{"internal-dpl-clock",
214 {},
215 {},
217
218 // In case InputSpec of origin AOD are
219 // requested but not available as part of the workflow,
220 // we insert in the configuration something which
221 // reads them from file.
222 //
223 // FIXME: source branch is DataOrigin, for the moment. We should
224 // make it configurable via ConfigParamsOptions
225 auto aodLifetime = Lifetime::Enumeration;
226
227 DataProcessorSpec aodReader{
228 .name = "internal-dpl-aod-reader",
229 .inputs = {InputSpec{"enumeration",
230 "DPL",
231 "ENUM",
232 static_cast<DataAllocator::SubSpecificationType>(compile_time_hash("internal-dpl-aod-reader")),
233 aodLifetime}},
234 .algorithm = AlgorithmSpec::dummyAlgorithm(),
235 .options = {ConfigParamSpec{"aod-file-private", VariantType::String, ctx.options().get<std::string>("aod-file"), {"AOD file"}},
236 ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}},
237 ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}},
238 ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
239 ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}},
240 ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},
241 ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
242 ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
243 ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}},
244 .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")};
245
246 // AOD reader can be rate limited
247 int rateLimitingIPCID = std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid"));
248 std::string rateLimitingChannelConfigInput;
249 std::string rateLimitingChannelConfigOutput;
250 bool internalRateLimiting = false;
251
252 // In case we have rate-limiting requested, any device without an input will get one on the special
253 // "DPL/RATE" message.
254 if (rateLimitingIPCID >= 0) {
255 rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
256 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
257 rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0",
258 ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
259 internalRateLimiting = true;
260 aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}});
261 }
262
263 ctx.services().registerService(ServiceRegistryHelpers::handleForService<AnalysisContext>(new AnalysisContext));
264 auto& ac = ctx.services().get<AnalysisContext>();
265
266 std::vector<InputSpec> requestedCCDBs;
267 std::vector<OutputSpec> providedCCDBs;
268
269 for (size_t wi = 0; wi < workflow.size(); ++wi) {
270 auto& processor = workflow[wi];
271 auto name = processor.name;
272 auto hash = runtime_hash(name.c_str());
273 ac.outTskMap.push_back({hash, name});
274
275 std::string prefix = "internal-dpl-";
276 if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) {
277 processor.inputs.push_back(InputSpec{"enumeration", "DPL", "ENUM", static_cast<DataAllocator::SubSpecificationType>(runtime_hash(processor.name.c_str())), Lifetime::Enumeration});
278 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"1st injected orbit"}});
279 ConfigParamsHelper::addOptionIfMissing(processor.options, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"orbits/TForbit"}});
280 processor.options.push_back(ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}});
281 processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}});
282 processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}});
283 }
284 bool hasTimeframeInputs = false;
285 for (auto& input : processor.inputs) {
286 if (input.lifetime == Lifetime::Timeframe) {
287 hasTimeframeInputs = true;
288 break;
289 }
290 }
291 bool hasTimeframeOutputs = false;
292 for (auto& output : processor.outputs) {
293 if (output.lifetime == Lifetime::Timeframe) {
294 hasTimeframeOutputs = true;
295 break;
296 }
297 }
298 // A timeframeSink consumes timeframes without creating new
299 // timeframe data.
300 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
301 if (std::stoi(ctx.options().get<std::string>("timeframes-rate-limit-ipcid")) != -1) {
302 if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) {
303 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
304 uint32_t hash = runtime_hash(processor.name.c_str());
305 bool hasMatch = false;
306 ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)};
307 for (auto& output : processor.outputs) {
308 if (DataSpecUtils::match(output, summaryMatcher)) {
309 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s",
310 DataSpecUtils::describe(output).c_str(), processor.name.c_str());
311 hasMatch = true;
312 break;
313 }
314 }
315 if (!hasMatch) {
316 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str());
317 processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast<DataAllocator::SubSpecificationType>(hash)}});
318 }
319 }
320 }
321 bool hasConditionOption = false;
322 for (size_t ii = 0; ii < processor.inputs.size(); ++ii) {
323 auto& input = processor.inputs[ii];
324 switch (input.lifetime) {
325 case Lifetime::Timer: {
326 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
327 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "period-" + input.binding); });
328 if (hasOption == false) {
329 processor.options.push_back(ConfigParamSpec{"period-" + input.binding, VariantType::Int, 1000, {"period of the timer in milliseconds"}});
330 }
331 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
332 } break;
333 case Lifetime::Signal: {
334 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
335 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
336 } break;
337 case Lifetime::Enumeration: {
338 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
339 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
340 } break;
341 case Lifetime::Condition: {
342 for (auto& option : processor.options) {
343 if (option.name == "condition-backend") {
344 hasConditionOption = true;
345 break;
346 }
347 }
348 if (hasConditionOption == false) {
349 processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}});
350 processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}});
351 hasConditionOption = true;
352 }
353 requestedCCDBs.emplace_back(input);
354 } break;
355 case Lifetime::OutOfBand: {
356 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
357 auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
358 if (hasOption == false) {
359 processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
360 }
361 timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
362 } break;
363 case Lifetime::QA:
364 case Lifetime::Transient:
365 case Lifetime::Timeframe:
366 case Lifetime::Optional:
367 break;
368 }
369 if (DataSpecUtils::partialMatch(input, AODOrigins)) {
370 DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input});
371 }
373 DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input});
374 }
376 DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input});
377 }
379 DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input});
380 }
381 }
382
383 std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
384
385 for (auto& output : processor.outputs) {
386 if (DataSpecUtils::partialMatch(output, AODOrigins)) {
387 ac.providedAODs.emplace_back(output);
389 ac.providedDYNs.emplace_back(output);
391 ac.providedTIMs.emplace_back(output);
393 ac.providedOutputObjHist.emplace_back(output);
394 auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; });
395 if (it == ac.outObjHistMap.end()) {
396 ac.outObjHistMap.push_back({hash, {output.binding.value}});
397 } else {
398 it->bindings.push_back(output.binding.value);
399 }
400 }
401 if (output.lifetime == Lifetime::Condition) {
402 providedCCDBs.push_back(output);
403 }
404 }
405 }
406
407 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
408 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
409 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
410 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
411 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
412 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
413
414 DataProcessorSpec indexBuilder{
415 "internal-dpl-aod-index-builder",
416 {},
417 {},
419 {}};
420 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder);
421
422 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
424 if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) {
425 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend);
426 }
427
428 for (auto& input : ac.requestedDYNs) {
429 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
430 ac.spawnerInputs.emplace_back(input);
431 }
432 }
433
434 DataProcessorSpec aodSpawner{
435 "internal-dpl-aod-spawner",
436 {},
437 {},
439 {}};
440 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner);
441
442 AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader);
443 AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
444
445 std::vector<DataProcessorSpec> extraSpecs;
446
447 if (transientStore.outputs.empty() == false) {
448 extraSpecs.push_back(transientStore);
449 }
450 if (qaStore.outputs.empty() == false) {
451 extraSpecs.push_back(qaStore);
452 }
453
454 if (aodSpawner.outputs.empty() == false) {
455 extraSpecs.push_back(timePipeline(aodSpawner, ctx.options().get<int64_t>("spawners")));
456 }
457
458 if (indexBuilder.outputs.empty() == false) {
459 extraSpecs.push_back(indexBuilder);
460 }
461
462 // add the reader
463 if (aodReader.outputs.empty() == false) {
464 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
465 if (mctracks2aod == workflow.end()) {
466 // add normal reader
467 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
468 if (internalRateLimiting) {
469 aodReader.algorithm = CommonDataProcessors::wrapWithRateLimiting(algo);
470 } else {
471 aodReader.algorithm = algo;
472 }
473 aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
474 aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
475 } else {
476 // AODs are being injected on-the-fly, add dummy reader
477 aodReader.algorithm = AlgorithmSpec{
479 [outputs = aodReader.outputs](DeviceSpec const&) {
480 LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
481 for (auto const& output : outputs) {
482 LOGP(warn, " {}", DataSpecUtils::describe(output));
483 }
484 LOGP(fatal, "Stopping.");
485 // to ensure the output type for adaptStateful
486 return adaptStateless([](DataAllocator&) {});
487 })};
488 }
489 auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
490 timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration);
491 extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
492 }
493
494 ConcreteDataMatcher dstf{"FLP", "DISTSUBTIMEFRAME", 0xccdb};
495 if (ccdbBackend.outputs.empty() == false) {
496 ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
497 InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
498 bool providesDISTSTF = false;
499 // Check if any of the provided outputs is a DISTSTF
500 // Check if any of the requested inputs is for a 0xccdb message
501 for (auto& dp : workflow) {
502 for (auto& output : dp.outputs) {
503 if (DataSpecUtils::match(matcher, output)) {
504 providesDISTSTF = true;
506 break;
507 }
508 }
509 if (providesDISTSTF) {
510 break;
511 }
512 }
513 // * If there are AOD outputs we use TFNumber as the CCDB clock
514 // * If one device provides a DISTSTF we use that as the CCDB clock
515 // * If one of the devices provides a timer we use that as the CCDB clock
516 // * If none of the above apply add to the first data processor
517 // which has no inputs apart from enumerations the responsibility
518 // to provide the DISTSUBTIMEFRAME.
519 if (aodReader.outputs.empty() == false) {
520 ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
521 } else if (providesDISTSTF) {
522 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
523 } else {
524 // We find the first device which has either just enumerations or
525 // just timers, and we add the DISTSUBTIMEFRAME to it.
526 // Notice how we do so in a stable manner by sorting the devices
527 // by name.
528 int enumCandidate = -1;
529 int timerCandidate = -1;
530 for (size_t wi = 0; wi < workflow.size(); wi++) {
531 auto& dp = workflow[wi];
532 if (dp.inputs.size() != 1) {
533 continue;
534 }
535 auto lifetime = dp.inputs[0].lifetime;
536 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
537 enumCandidate = wi;
538 }
539 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
540 timerCandidate = wi;
541 }
542 }
543 if (enumCandidate != -1) {
544 auto& dp = workflow[enumCandidate];
545 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
546 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
547 } else if (timerCandidate != -1) {
548 auto& dp = workflow[timerCandidate];
549 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
550 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
551 }
552 }
553
554 // Load the CCDB backend from the plugin
555 ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
556 extraSpecs.push_back(ccdbBackend);
557 } else {
558 // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
559 // we add to the first data processor which has no inputs (apart from
560 // enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
561 bool requiresDISTSUBTIMEFRAME = false;
562 for (auto& dp : workflow) {
563 for (auto& input : dp.inputs) {
564 if (DataSpecUtils::match(input, dstf)) {
565 requiresDISTSUBTIMEFRAME = true;
566 break;
567 }
568 }
569 }
570 if (requiresDISTSUBTIMEFRAME) {
571 // We find the first device which has either just enumerations or
572 // just timers, and we add the DISTSUBTIMEFRAME to it.
573 // Notice how we do so in a stable manner by sorting the devices
574 // by name.
575 int enumCandidate = -1;
576 int timerCandidate = -1;
577 for (size_t wi = 0; wi < workflow.size(); wi++) {
578 auto& dp = workflow[wi];
579 if (dp.inputs.size() != 1) {
580 continue;
581 }
582 auto lifetime = dp.inputs[0].lifetime;
583 if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
584 enumCandidate = wi;
585 }
586 if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
587 timerCandidate = wi;
588 }
589 }
590 if (enumCandidate != -1) {
591 auto& dp = workflow[enumCandidate];
592 DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
593 ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
594 } else if (timerCandidate != -1) {
595 auto& dp = workflow[timerCandidate];
596 dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
597 ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
598 }
599 }
600 }
601
602 // add the Analysys CCDB backend which reads CCDB objects using a provided
603 // table
604 if (analysisCCDBBackend.outputs.empty() == false) {
605 // add normal reader
606 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
607 analysisCCDBBackend.algorithm = algo;
608 extraSpecs.push_back(analysisCCDBBackend);
609 }
610
611 // add the timer
612 if (timer.outputs.empty() == false) {
613 extraSpecs.push_back(timer);
614 }
615
616 // This is to inject a file sink so that any dangling ATSK object is written
617 // to a ROOT file.
618 if (ac.providedOutputObjHist.empty() == false) {
620 extraSpecs.push_back(rootSink);
621 }
622
623 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
624 extraSpecs.clear();
625
627 auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
628 ac.isDangling = isDanglingTmp;
629 ac.outputsInputs = outputsInputsTmp;
630
631 // create DataOutputDescriptor
632 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
633
634 // select outputs of type AOD which need to be saved
635 // ATTENTION: if there are dangling outputs the getGlobalAODSink
636 // has to be created in any case!
637 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
638 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
639 auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]);
640 if (ds.size() > 0 || ac.isDangling[ii]) {
641 ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]);
642 }
643 }
644 }
645
646 // file sink for any AOD output
647 if (ac.outputsInputsAOD.size() > 0) {
648 // add TFNumber and TFFilename as input to the writer
649 ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
650 ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
651 auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
652 extraSpecs.push_back(fileSink);
653
654 auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool {
655 return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
656 });
657 size_t ii = std::distance(ac.outputsInputs.begin(), it);
658 ac.isDangling[ii] = false;
659 }
660
661 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
662 extraSpecs.clear();
663
664 // Select dangling outputs which are not of type AOD
665 std::vector<InputSpec> redirectedOutputsInputs;
666 for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) {
667 if (ctx.options().get<std::string>("forwarding-policy") == "none") {
668 continue;
669 }
670 // We forward to the output proxy all the inputs only if they are dangling
671 // or if the forwarding policy is "proxy".
672 if (!ac.isDangling[ii] && (ctx.options().get<std::string>("forwarding-policy") != "all")) {
673 continue;
674 }
675 // AODs are skipped in any case.
676 if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) {
677 continue;
678 }
679 redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]);
680 }
681
682 std::vector<InputSpec> unmatched;
683 auto forwardingDestination = ctx.options().get<std::string>("forwarding-destination");
684 if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "file") {
685 auto fileSink = CommonDataProcessors::getGlobalFileSink(redirectedOutputsInputs, unmatched);
686 if (unmatched.size() != redirectedOutputsInputs.size()) {
687 extraSpecs.push_back(fileSink);
688 }
689 } else if (redirectedOutputsInputs.size() > 0 && forwardingDestination == "fairmq") {
690 auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(redirectedOutputsInputs);
691 extraSpecs.push_back(fairMQSink);
692 } else if (forwardingDestination != "drop") {
693 throw runtime_error_f("Unknown forwarding destination %s", forwardingDestination.c_str());
694 }
695 if (unmatched.size() > 0 || redirectedOutputsInputs.size() > 0) {
696 std::vector<InputSpec> ignored = unmatched;
697 ignored.insert(ignored.end(), redirectedOutputsInputs.begin(), redirectedOutputsInputs.end());
698 for (auto& ignoredInput : ignored) {
699 ignoredInput.lifetime = Lifetime::Sporadic;
700 }
701
702 extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput));
703 }
704
705 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
706 extraSpecs.clear();
707}
708
710{
711 unsigned int distSTFCount = 0;
712 for (auto& spec : workflow) {
713 auto& inputs = spec.inputs;
714 bool allSporadic = true;
715 bool hasTimer = false;
716 bool hasSporadic = false;
717 bool hasOptionals = false;
718 for (auto& input : inputs) {
719 if (input.lifetime == Lifetime::Optional) {
720 hasOptionals = true;
721 }
722 }
723 for (auto& input : inputs) {
724 // Any InputSpec that is DPL/DISTSUBTIMEFRAME/0 will actually be replaced by one
725 // which looks like DPL/DISTSUBTIMEFRAME/<incremental number> for devices that
726 // have Optional inputs as well.
727 // This is done to avoid the race condition where the DISTSUBTIMEFRAME/0 gets
728 // forwarded before actual RAWDATA arrives.
729 if (DataSpecUtils::match(input, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}) &&
730 !DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
731 LOGP(error,
732 "Only FLP/DISTSUBTIMEFRAME/0 is supported as input "
733 "provided by the user. Please replace {} with FLP/DISTSUBTIMEFRAME/0 in {}.",
734 DataSpecUtils::describe(input), input.binding);
735 }
736 if (hasOptionals && DataSpecUtils::match(input, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
737 // The first one remains unchanged, therefore we use the postincrement
738 DataSpecUtils::updateMatchingSubspec(input, distSTFCount++);
739 continue;
740 }
741 // Timers are sporadic only when they are not
742 // alone.
743 if (input.lifetime == Lifetime::Timer) {
744 hasTimer = true;
745 continue;
746 }
747 if (input.lifetime == Lifetime::Sporadic) {
748 hasSporadic = true;
749 } else {
750 allSporadic = false;
751 }
752 }
753
754 LOGP(debug, "WorkflowHelpers::adjustTopology: spec {} hasTimer {} hasSporadic {} allSporadic {}", spec.name, hasTimer, hasSporadic, allSporadic);
755
756 // If they are not all sporadic (excluding timers)
757 // we leave things as they are.
758 if (allSporadic == false) {
759 continue;
760 }
761 // A timer alone is not sporadic.
762 if (hasSporadic == false) {
763 continue;
764 }
768 for (auto& output : spec.outputs) {
769 if (output.lifetime == Lifetime::Timeframe) {
770 output.lifetime = Lifetime::Sporadic;
771 }
772 }
773 }
774
775 if (distSTFCount > 0) {
776 bool found = false;
777 for (auto& spec : workflow) {
778 for (auto& output : spec.outputs) {
779 if (DataSpecUtils::match(output, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0})) {
780 found = true;
781 break;
782 }
783 }
784 if (found) {
785 for (unsigned int i = 1; i < distSTFCount; ++i) {
786 spec.outputs.emplace_back(OutputSpec{ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", i}, Lifetime::Timeframe});
787 }
788 break;
789 }
790 }
791 }
792}
793
795 std::vector<DeviceConnectionEdge>& logicalEdges,
796 std::vector<OutputSpec>& outputs,
797 std::vector<LogicalForwardInfo>& forwardedInputsInfo)
798{
799 // In case the workflow is empty, we do not have anything to do.
800 if (workflow.empty()) {
801 return;
802 }
803
804 // This is the state. Oif is the iterator I use for the searches.
805 std::vector<LogicalOutputInfo> availableOutputsInfo;
806 auto const& constOutputs = outputs; // const version of the outputs
807 // Forwards is a local cache to avoid adding forwards before time.
808 std::vector<LogicalOutputInfo> forwards;
809
810 // Notice that availableOutputsInfo MUST be updated first, since it relies on
811 // the size of outputs to be the one before the update.
812 auto enumerateAvailableOutputs = [&workflow, &outputs, &availableOutputsInfo]() {
813 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
814 for (size_t wi = 0; wi < workflow.size(); ++wi) {
815 auto& producer = workflow[wi];
816 if (producer.outputs.empty()) {
817 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "No outputs for [%zu] %{public}s", wi, producer.name.c_str());
818 }
819 O2_SIGNPOST_START(workflow_helpers, sid, "output enumeration", "Enumerating outputs for producer [%zu] %{}s public", wi, producer.name.c_str());
820
821 for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
822 auto& out = producer.outputs[oi];
823 auto uniqueOutputId = outputs.size();
824 availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
825 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "- [%zu, %zu] %{public}s",
826 oi, uniqueOutputId, DataSpecUtils::describe(out).c_str());
827 outputs.push_back(out);
828 }
829 O2_SIGNPOST_END(workflow_helpers, sid, "output enumeration", "");
830 }
831 };
832
833 auto errorDueToMissingOutputFor = [&workflow, &constOutputs](size_t ci, size_t ii) {
834 auto input = workflow[ci].inputs[ii];
835 std::ostringstream str;
836 str << "No matching output found for "
837 << DataSpecUtils::describe(input) << " as requested by data processor \"" << workflow[ci].name << "\". Candidates:\n";
838
839 for (auto& output : constOutputs) {
840 str << "-" << DataSpecUtils::describe(output) << "\n";
841 }
842
843 throw std::runtime_error(str.str());
844 };
845
846 // This is the outer loop
847 //
848 // Here we iterate over dataprocessor items in workflow and we consider them
849 // as consumer, since we are interested in their inputs.
850 // Notice also we need to search for all the matching inputs, since
851 // we could have more than one source that matches (e.g. in the
852 // case of a time merger).
853 // Once consumed, an output is not actually used anymore, however
854 // we append it as a forward.
855 // Finally, If a device has n-way pipelining, we need to create one node per
856 // parallel pipeline and add an edge for each.
857 enumerateAvailableOutputs();
858
859 std::vector<bool> matches(constOutputs.size());
860 for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
861 O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers);
862 O2_SIGNPOST_START(workflow_helpers, sid, "input matching", "Matching inputs of consumer [%zu] %{}s public", consumer, workflow[consumer].name.c_str());
863 for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
864 forwards.clear();
865 for (size_t i = 0; i < constOutputs.size(); i++) {
866 matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
867 if (matches[i]) {
868 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Input %{public}s matches %{public}s",
869 DataSpecUtils::describe(workflow[consumer].inputs[input]).c_str(),
870 DataSpecUtils::describe(constOutputs[i]).c_str());
871 }
872 }
873
874 for (size_t i = 0; i < availableOutputsInfo.size(); i++) {
875 // Notice that if the output is actually a forward, we need to store that information so that when we add it at device level we know which output channel we need to connect it too.
876 if (!matches[availableOutputsInfo[i].outputGlobalIndex]) {
877 continue;
878 }
879 auto* oif = &availableOutputsInfo[i];
880 if (oif->forward) {
881 forwardedInputsInfo.emplace_back(LogicalForwardInfo{consumer, input, oif->outputGlobalIndex});
882 }
883 auto producer = oif->specIndex;
884 auto uniqueOutputId = oif->outputGlobalIndex;
885 for (size_t tpi = 0; tpi < workflow[consumer].maxInputTimeslices; ++tpi) {
886 for (size_t ptpi = 0; ptpi < workflow[producer].maxInputTimeslices; ++ptpi) {
887 O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output", "Adding edge between %{public}s and %{public}s", workflow[consumer].name.c_str(),
888 workflow[producer].name.c_str());
889 logicalEdges.emplace_back(DeviceConnectionEdge{producer, consumer, tpi, ptpi, uniqueOutputId, input, oif->forward});
890 }
891 }
892 forwards.push_back(LogicalOutputInfo{consumer, uniqueOutputId, true});
893 // We have consumed the input, therefore we remove it from the list. We will insert the forwarded inputs only at the end of the iteration.
894 oif->enabled = false;
895 }
896 if (forwards.empty()) {
897 errorDueToMissingOutputFor(consumer, input);
898 }
899 availableOutputsInfo.erase(std::remove_if(availableOutputsInfo.begin(), availableOutputsInfo.end(), [](auto& info) { return info.enabled == false; }), availableOutputsInfo.end());
900 for (auto& forward : forwards) {
901 availableOutputsInfo.push_back(forward);
902 }
903 }
904 O2_SIGNPOST_END(workflow_helpers, sid, "input matching", "");
905 }
906}
907
908std::vector<EdgeAction>
910 const std::vector<DeviceConnectionEdge>& edges,
911 const std::vector<size_t>& index)
912{
913 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
914
915 assert(edges.size() == index.size());
916 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
917 for (size_t i : index) {
918 auto& edge = edges[i];
919 auto& action = actions[i];
920 action.requiresNewDevice = last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
921 action.requiresNewChannel = last.consumer != edge.consumer || last.producer != edge.producer || last.timeIndex != edge.timeIndex || last.producerTimeIndex != edge.producerTimeIndex;
922 last = edge;
923 }
924 return actions;
925}
926
927std::vector<EdgeAction>
929 const std::vector<DeviceConnectionEdge>& edges,
930 const std::vector<size_t>& index)
931{
932 DeviceConnectionEdge last{ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX, ULONG_MAX};
933
934 assert(edges.size() == index.size());
935 std::vector<EdgeAction> actions(edges.size(), EdgeAction{false, false});
936 for (size_t i : index) {
937 auto& edge = edges[i];
938 auto& action = actions[i];
939 // Calculate which actions need to be taken for this edge.
940 action.requiresNewDevice = last.consumer != edge.consumer || last.timeIndex != edge.timeIndex;
941 action.requiresNewChannel =
942 last.consumer != edge.consumer || last.timeIndex != edge.timeIndex || last.producer != edge.producer || last.producerTimeIndex != edge.producerTimeIndex;
943
944 last = edge;
945 }
946 return actions;
947}
948
949void WorkflowHelpers::sortEdges(std::vector<size_t>& inEdgeIndex,
950 std::vector<size_t>& outEdgeIndex,
951 const std::vector<DeviceConnectionEdge>& edges)
952{
953 inEdgeIndex.resize(edges.size());
954 outEdgeIndex.resize(edges.size());
955 std::iota(inEdgeIndex.begin(), inEdgeIndex.end(), 0);
956 std::iota(outEdgeIndex.begin(), outEdgeIndex.end(), 0);
957
958 // Two indexes, one to bind the outputs, the other
959 // one to connect the inputs. The
960 auto outSorter = [&edges](size_t i, size_t j) {
961 auto& a = edges[i];
962 auto& b = edges[j];
963 return std::tie(a.producer, a.producerTimeIndex, a.timeIndex, a.consumer) < std::tie(b.producer, b.producerTimeIndex, b.timeIndex, b.consumer);
964 };
965 auto inSorter = [&edges](size_t i, size_t j) {
966 auto& a = edges[i];
967 auto& b = edges[j];
968 return std::tie(a.consumer, a.timeIndex, a.producer, a.producerTimeIndex) < std::tie(b.consumer, b.timeIndex, b.producer, b.producerTimeIndex);
969 };
970
971 std::sort(inEdgeIndex.begin(), inEdgeIndex.end(), inSorter);
972 std::sort(outEdgeIndex.begin(), outEdgeIndex.end(), outSorter);
973}
974
976{
977 if (workflow.empty()) {
979 }
980 std::set<std::string> validNames;
981 std::vector<OutputSpec> availableOutputs;
982 std::vector<InputSpec> requiredInputs;
983
984 // An index many to one index to go from a given input to the
985 // associated spec
986 std::map<size_t, size_t> inputToSpec;
987 // A one to one index to go from a given output to the Spec emitting it
988 std::map<size_t, size_t> outputToSpec;
989
990 std::ostringstream ss;
991
992 for (auto& spec : workflow) {
993 if (spec.name.empty()) {
994 throw std::runtime_error("Invalid DataProcessorSpec name");
995 }
996 if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
997 throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
998 }
999 if (validNames.find(spec.name) != validNames.end()) {
1000 throw std::runtime_error("Name " + spec.name + " is used twice.");
1001 }
1002 validNames.insert(spec.name);
1003 for (auto& option : spec.options) {
1004 if (option.defaultValue.type() != VariantType::Empty &&
1005 option.type != option.defaultValue.type()) {
1006 ss << "Mismatch between declared option type (" << (int)option.type << ") and default value type (" << (int)option.defaultValue.type()
1007 << ") for " << option.name << " in DataProcessorSpec of "
1008 << spec.name;
1009 throw std::runtime_error(ss.str());
1010 }
1011 }
1012 for (size_t ii = 0; ii < spec.inputs.size(); ++ii) {
1013 InputSpec const& input = spec.inputs[ii];
1014 if (DataSpecUtils::validate(input) == false) {
1015 ss << "In spec " << spec.name << " input specification "
1016 << ii << " requires binding, description and origin"
1017 " to be fully specified (found "
1018 << input.binding << ":" << DataSpecUtils::describe(input) << ")";
1019 throw std::runtime_error(ss.str());
1020 }
1021 }
1022 }
1024}
1025
1026using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
1029 size_t id;
1030};
1031
1032std::tuple<std::vector<InputSpec>, std::vector<bool>> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow)
1033{
1034 // compute total number of input/output
1035 size_t totalInputs = 0;
1036 size_t totalOutputs = 0;
1037 for (auto& spec : workflow) {
1038 totalInputs += spec.inputs.size();
1039 totalOutputs += spec.outputs.size();
1040 }
1041
1042 std::vector<DataMatcherId> inputs;
1043 std::vector<DataMatcherId> outputs;
1044 inputs.reserve(totalInputs);
1045 outputs.reserve(totalOutputs);
1046
1047 std::vector<InputSpec> results;
1048 std::vector<bool> isDangling;
1049 results.reserve(totalOutputs);
1050 isDangling.reserve(totalOutputs);
1051
1053 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
1054 auto& spec = workflow[wi];
1055 for (size_t ii = 0, ie = spec.inputs.size(); ii != ie; ++ii) {
1056 inputs.emplace_back(DataMatcherId{wi, ii});
1057 }
1058 for (size_t oi = 0, oe = spec.outputs.size(); oi != oe; ++oi) {
1059 outputs.emplace_back(DataMatcherId{wi, oi});
1060 }
1061 }
1062
1063 for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) {
1064 auto& output = outputs[oi];
1065 auto& outputSpec = workflow[output.workflowId].outputs[output.id];
1066
1067 // is dangling output?
1068 bool matched = false;
1069 for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) {
1070 auto& input = inputs[ii];
1071 // Inputs of the same workflow cannot match outputs
1072 if (output.workflowId == input.workflowId) {
1073 continue;
1074 }
1075 auto& inputSpec = workflow[input.workflowId].inputs[input.id];
1076 if (DataSpecUtils::match(inputSpec, outputSpec)) {
1077 matched = true;
1078 break;
1079 }
1080 }
1081
1082 auto input = DataSpecUtils::matchingInput(outputSpec);
1083 char buf[64];
1084 input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf);
1085
1086 // make sure that entries are unique
1087 if (std::find(results.begin(), results.end(), input) == results.end()) {
1088 results.emplace_back(input);
1089 isDangling.emplace_back(matched == false);
1090 }
1091 }
1092
1093 // make sure that results is unique
1094 return std::make_tuple(results, isDangling);
1095}
1096
1097std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
1098{
1099
1100 auto [outputsInputs, isDangling] = analyzeOutputs(workflow);
1101
1102 std::vector<InputSpec> results;
1103 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
1104 if (isDangling[ii]) {
1105 results.emplace_back(outputsInputs[ii]);
1106 }
1107 }
1108
1109 return results;
1110}
1111
1112bool validateLifetime(std::ostream& errors,
1113 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1114 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1115{
1116 // In case the completion policy is consume-any, we do not need to check anything.
1117 if (consumerPolicies.completionPolicyName == "consume-any") {
1118 return true;
1119 }
1120 if (input.lifetime == Lifetime::Timeframe && output.lifetime == Lifetime::Sporadic) {
1121 errors << fmt::format("Input {} of {} has lifetime Timeframe, but output {} of {} has lifetime Sporadic\n",
1122 DataSpecUtils::describe(input).c_str(), consumer.name,
1123 DataSpecUtils::describe(output).c_str(), producer.name);
1124 return false;
1125 }
1126 return true;
1127}
1128
1129bool validateExpendable(std::ostream& errors,
1130 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1131 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)
1132{
1133 auto isExpendable = [](DataProcessorLabel const& label) {
1134 return label.value == "expendable";
1135 };
1136 auto isResilient = [](DataProcessorLabel const& label) {
1137 return label.value == "expendable" || label.value == "resilient";
1138 };
1139 bool producerExpendable = std::find_if(producer.labels.begin(), producer.labels.end(), isExpendable) != producer.labels.end();
1140 bool consumerCritical = std::find_if(consumer.labels.begin(), consumer.labels.end(), isResilient) == consumer.labels.end();
1141 if (producerExpendable && consumerCritical) {
1142 errors << fmt::format("Critical consumer {} depends on expendable producer {}\n",
1143 consumer.name,
1144 producer.name);
1145 return false;
1146 }
1147 return true;
1148}
1149
1150using Validator = std::function<bool(std::ostream& errors,
1151 DataProcessorSpec const& producer, OutputSpec const& output, DataProcessorPoliciesInfo const& producerPolicies,
1152 DataProcessorSpec const& consumer, InputSpec const& input, DataProcessorPoliciesInfo const& consumerPolicies)>;
1153
1155 std::vector<DataProcessorPoliciesInfo> const& policies,
1156 std::vector<DeviceConnectionEdge> const& edges,
1157 std::vector<OutputSpec> const& outputs)
1158{
1159 static bool disableLifetimeCheck = getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES") && atoi(getenv("DPL_WORKAROUND_DO_NOT_CHECK_FOR_CORRECT_WORKFLOW_LIFETIMES"));
1160 std::vector<Validator> defaultValidators = {validateExpendable};
1161 if (!disableLifetimeCheck) {
1162 defaultValidators.emplace_back(validateLifetime);
1163 }
1164 std::stringstream errors;
1165 // Iterate over all the edges.
1166 // Get the input lifetime and the output lifetime.
1167 // Output lifetime must be Timeframe if the input lifetime is Timeframe.
1168 bool hasErrors = false;
1169 for (auto& edge : edges) {
1170 DataProcessorSpec const& producer = workflow[edge.producer];
1171 DataProcessorSpec const& consumer = workflow[edge.consumer];
1172 DataProcessorPoliciesInfo const& producerPolicies = policies[edge.producer];
1173 DataProcessorPoliciesInfo const& consumerPolicies = policies[edge.consumer];
1174 OutputSpec const& output = outputs[edge.outputGlobalIndex];
1175 InputSpec const& input = consumer.inputs[edge.consumerInputIndex];
1176 for (auto& validator : defaultValidators) {
1177 hasErrors |= !validator(errors, producer, output, producerPolicies, consumer, input, consumerPolicies);
1178 }
1179 }
1180 if (hasErrors) {
1181 throw std::runtime_error(errors.str());
1182 }
1183}
1184
1185} // namespace o2::framework
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
consteval uint32_t compile_time_hash(Ts... Vs)
std::ostringstream debug
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLenum GLboolean GLsizei stride
Definition glcorearb.h:867
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
int defaultConditionQueryRate()
bool validateLifetime(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::string defaultConditionBackend()
int defaultConditionQueryRateMultiplier()
bool validateExpendable(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::function< bool(std::ostream &errors, DataProcessorSpec const &producer, OutputSpec const &output, DataProcessorPoliciesInfo const &producerPolicies, DataProcessorSpec const &consumer, InputSpec const &input, DataProcessorPoliciesInfo const &consumerPolicies)> Validator
RuntimeErrorRef runtime_error_f(const char *,...)
std::variant< InputSpec, OutputSpec > UnifiedDataSpecType
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
D const SVectorGPU< T, D > & rhs
Definition SMatrixGPU.h:193
static AlgorithmSpec dummyAlgorithm()
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToReader(std::vector< OutputSpec > const &providedOutputs, std::vector< InputSpec > const &requestedInputs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static DataProcessorSpec getOutputObjHistSink(ConfigContext const &)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateOutputList(std::vector< OutputSpec > &list, OutputSpec &&input)
Updates list of OutputSpecs by merging metadata (or adding output).
static bool validate(InputSpec const &input)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
enum Lifetime lifetime
Definition InputSpec.h:73
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
Helper struct to keep track of the results of the topological sort.
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str