Project
Loading...
Searching...
No Matches
WorkflowSerializationHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
20#include "Framework/Logger.h"
21#include "Framework/Signpost.h"
22
23#include <rapidjson/reader.h>
24#include <rapidjson/prettywriter.h>
25#include <rapidjson/istreamwrapper.h>
26#include <rapidjson/ostreamwrapper.h>
27#include <iostream>
28#include <algorithm>
29#include <memory>
30
31O2_DECLARE_DYNAMIC_LOG(workflow_importer);
32O2_DECLARE_DYNAMIC_LOG(post_workflow_importer);
33
34namespace o2::framework
35{
36
37using namespace rapidjson;
38using namespace o2::framework::data_matcher;
39
40struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, WorkflowImporter> {
101
102 friend std::ostream& operator<<(std::ostream& s, State state)
103 {
104 switch (state) {
105 case State::IN_START:
106 s << "IN_START";
107 break;
109 s << "IN_EXECUTION";
110 break;
112 s << "IN_WORKFLOW";
113 break;
115 s << "IN_COMMAND";
116 break;
118 s << "IN_DATAPROCESSORS";
119 break;
121 s << "IN_DATAPROCESSOR";
122 break;
124 s << "IN_DATAPROCESSOR_NAME";
125 break;
127 s << "IN_DATAPROCESSOR_RANK";
128 break;
130 s << "IN_DATAPROCESSOR_N_SLOTS";
131 break;
133 s << "IN_DATAPROCESSOR_TIMESLICE_ID";
134 break;
136 s << "IN_DATAPROCESSOR_MAX_TIMESLICES";
137 break;
138 case State::IN_INPUTS:
139 s << "IN_INPUTS";
140 break;
142 s << "IN_OUTPUTS";
143 break;
145 s << "IN_OPTIONS";
146 break;
147 case State::IN_LABELS:
148 s << "IN_LABELS";
149 break;
151 s << "IN_METADATA";
152 break;
154 s << "IN_WORKFLOW_OPTIONS";
155 break;
156 case State::IN_INPUT:
157 s << "IN_INPUT";
158 break;
160 s << "IN_INPUT_BINDING";
161 break;
163 s << "IN_INPUT_ORIGIN";
164 break;
166 s << "IN_INPUT_DESCRIPTION";
167 break;
169 s << "IN_INPUT_SUBSPEC";
170 break;
172 s << "IN_INPUT_ORIGIN_REF";
173 break;
175 s << "IN_INPUT_DESCRIPTION_REF";
176 break;
178 s << "IN_INPUT_SUBSPEC_REF";
179 break;
181 s << "IN_INPUT_MATCHER";
182 break;
184 s << "IN_INPUT_MATCHER_OPERATION";
185 break;
187 s << "IN_INPUT_LEFT_MATCHER";
188 break;
190 s << "IN_INPUT_RIGHT_MATCHER";
191 break;
193 s << "IN_INPUT_LIFETIME";
194 break;
196 s << "IN_INPUT_STARTTIME";
197 break;
199 s << "IN_INPUT_OPTIONS";
200 break;
201 case State::IN_OUTPUT:
202 s << "IN_OUTPUT";
203 break;
205 s << "IN_OUTPUT_BINDING";
206 break;
208 s << "IN_OUTPUT_ORIGIN";
209 break;
211 s << "IN_OUTPUT_DESCRIPTION";
212 break;
214 s << "IN_OUTPUT_SUBSPEC";
215 break;
217 s << "IN_OUTPUT_LIFETIME";
218 break;
220 s << "IN_OUTPUT_OPTIONS";
221 break;
222 case State::IN_OPTION:
223 s << "IN_OPTION";
224 break;
226 s << "IN_OPTION_NAME";
227 break;
229 s << "IN_OPTION_TYPE";
230 break;
232 s << "IN_OPTION_DEFAULT";
233 break;
235 s << "IN_OPTION_HELP";
236 break;
238 s << "IN_OPTION_KIND";
239 break;
240 case State::IN_LABEL:
241 s << "IN_LABEL";
242 break;
244 s << "IN_METADATUM";
245 break;
247 s << "IN_METADATUM_KEY";
248 break;
250 s << "IN_METADATUM_VALUE";
251 break;
252 case State::IN_ERROR:
253 s << "IN_ERROR";
254 break;
256 s << "IN_DATAPROCESSOR_INFOS";
257 break;
259 s << "IN_DATAPROCESSOR_INFO";
260 break;
262 s << "IN_DATAPROCESSOR_INFO_NAME";
263 break;
265 s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
266 break;
268 s << "IN_DATAPROCESSOR_INFO_ARGS";
269 break;
271 s << "IN_DATAPROCESSOR_INFO_ARG";
272 break;
274 s << "IN_DATAPROCESSOR_INFO_CHANNELS";
275 break;
277 s << "IN_DATAPROCESSOR_INFO_CHANNEL";
278 break;
279 }
280 return s;
281 }
282
283 WorkflowImporter(std::vector<DataProcessorSpec>& o,
284 std::vector<DataProcessorInfo>& m,
285 CommandInfo& c)
286 : states{},
288 metadata{m},
289 command{c}
290 {
292 }
293
295 {
296 enter("START_OBJECT");
297 if (in(State::IN_START)) {
299 } else if (in(State::IN_DATAPROCESSORS)) {
302 } else if (in(State::IN_DATAPROCESSOR)) {
304 } else if (in(State::IN_INPUTS)) {
306 inputMatcherNodes.clear();
307 } else if (in(State::IN_INPUT_MATCHER)) {
308 // start a new embedded matcher
309 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
310 // this is a matcher leaf, i.e. last matcher of a branch
311 // will be merged into the parent matcher
312 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
313 // this is a matcher leaf, i.e. last matcher of a branch
314 // will be merged into the parent matcher
315 } else if (in(State::IN_OUTPUTS)) {
317 outputHasSubSpec = false;
318 } else if (in(State::IN_OPTIONS)) {
320 } else if (in(State::IN_INPUT_OPTIONS)) {
322 } else if (in(State::IN_OUTPUT_OPTIONS)) {
324 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
326 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
328 metadata.push_back(DataProcessorInfo{});
329 } else if (in(State::IN_DATAPROCESSOR_INFO)) {
330 metadata.push_back(DataProcessorInfo{});
331 } else if (in(State::IN_METADATA)) {
333 metadatumKey.clear();
334 metadatumValue.clear();
335 } else if (in(State::IN_COMMAND)) {
337 }
338 return true;
339 }
340
341 bool EndObject(SizeType memberCount)
342 {
343 enter("END_OBJECT");
344 if (in(State::IN_INPUT)) {
345 auto buildMatcher = [](auto& nodes) -> std::unique_ptr<DataDescriptorMatcher> {
346 auto lastMatcher =
347 std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::Just,
348 StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
349 for (size_t ni = 0, ne = nodes.size(); ni < ne; ++ni) {
350 auto& node = nodes[nodes.size() - 1 - ni];
351 auto tmp = std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And,
352 std::move(node),
353 std::move(lastMatcher));
354 assert(lastMatcher.get() == nullptr);
355 lastMatcher = std::move(tmp);
356 }
357 return lastMatcher;
358 };
359
360 std::unique_ptr<DataDescriptorMatcher> matcher;
361 if (auto* pval = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[0])) {
362 assert(inputMatcherNodes.size() == 1);
363 matcher = std::move(*pval);
364 } else {
365 matcher = buildMatcher(inputMatcherNodes);
366 }
367 auto concrete = DataSpecUtils::optionalConcreteDataMatcherFrom(*matcher);
368 if (concrete.has_value()) {
369 // the matcher is fully qualified with unique parameters so we add ConcreteDataMatcher
370 dataProcessors.back().inputs.push_back(InputSpec({binding}, (*concrete).origin, (*concrete).description, (*concrete).subSpec, lifetime, inputOptions));
371 } else {
372 dataProcessors.back().inputs.push_back(InputSpec({binding}, std::move(*matcher), lifetime, inputOptions));
373 }
374 inputMatcherNodes.clear();
375 inputOptions.clear();
376
377 } else if (in(State::IN_INPUT_MATCHER) && inputMatcherNodes.size() > 1) {
378 data_matcher::Node child = std::move(inputMatcherNodes.back());
379 inputMatcherNodes.pop_back();
380 auto* matcher = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&child);
381 assert(matcher != nullptr);
382 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
383 assert(parent != nullptr);
384 std::unique_ptr<DataDescriptorMatcher> node;
385 auto mergeDown = [&node, &parent, &child]() -> bool {
386 // FIXME: do we need a dedicated default state, or can we simply use ConstantValueMatcher
387 if (auto* pval1 = std::get_if<ConstantValueMatcher>(&((*parent)->getLeft()))) {
388 if (*pval1 == ConstantValueMatcher{false}) {
389 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
390 std::move(child),
391 std::move((*parent)->getRight()));
392 return true;
393 }
394 }
395 if (auto* pval2 = std::get_if<ConstantValueMatcher>(&((*parent)->getRight()))) {
396 if (*pval2 == ConstantValueMatcher{false}) {
397 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
398 std::move((*parent)->getLeft()),
399 std::move(child));
400 return true;
401 }
402 }
403 return false;
404 };
405 if (!mergeDown()) {
406 states.push_back(State::IN_ERROR);
407 }
408 inputMatcherNodes.pop_back();
409 inputMatcherNodes.push_back(std::move(node));
410 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
411 assert(inputMatcherNodes.size() >= 2);
412 size_t nMatchers = inputMatcherNodes.size();
413 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[nMatchers - 2]);
414 assert(parent != nullptr);
415 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
416 std::move(inputMatcherNodes[nMatchers - 1]),
417 std::move((*parent)->getRight()));
418 inputMatcherNodes.pop_back();
419 inputMatcherNodes.pop_back();
420 inputMatcherNodes.push_back(std::move(node));
421 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
422 data_matcher::Node child = std::move(inputMatcherNodes.back());
423 inputMatcherNodes.pop_back();
424 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
425 assert(parent != nullptr);
426 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
427 std::move((*parent)->getLeft()),
428 std::move(child));
429 inputMatcherNodes.pop_back();
430 inputMatcherNodes.push_back(std::move(node));
431 } else if (in(State::IN_OUTPUT)) {
432 if (outputHasSubSpec) {
434 } else {
435 dataProcessors.back().outputs.push_back(OutputSpec({binding}, {origin, description}, lifetime, outputOptions));
436 }
437 outputOptions.clear();
438 outputHasSubSpec = false;
439 } else if (in(State::IN_OPTION)) {
440 std::unique_ptr<ConfigParamSpec> opt{nullptr};
441
442 using HelpString = ConfigParamSpec::HelpString;
443 std::stringstream is;
444 is.str(optionDefault);
445 switch (optionType) {
447 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault.c_str(), HelpString{optionHelp}, optionKind);
448 break;
449 case VariantType::Int:
450 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
451 break;
453 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
454 break;
456 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
457 break;
459 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
460 break;
462 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
463 break;
465 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint32_t>(std::stoul(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
466 break;
468 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoul(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
469 break;
471 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stol(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
472 break;
474 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stof(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
475 break;
477 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stod(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
478 break;
480 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, (bool)std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
481 break;
483 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayInt>(is), HelpString{optionHelp}, optionKind);
484 break;
486 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayFloat>(is), HelpString{optionHelp}, optionKind);
487 break;
489 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayDouble>(is), HelpString{optionHelp}, optionKind);
490 break;
491 // case VariantType::ArrayBool:
492 // opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayBool>(is), HelpString{optionHelp}, optionKind);
493 // break;
495 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayString>(is), HelpString{optionHelp}, optionKind);
496 break;
498 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DInt>(is), HelpString{optionHelp}, optionKind);
499 break;
501 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DFloat>(is), HelpString{optionHelp}, optionKind);
502 break;
504 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DDouble>(is), HelpString{optionHelp}, optionKind);
505 break;
507 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayInt>(is), HelpString{optionHelp}, optionKind);
508 break;
510 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayFloat>(is), HelpString{optionHelp}, optionKind);
511 break;
513 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayDouble>(is), HelpString{optionHelp}, optionKind);
514 break;
516 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayString>(is), HelpString{optionHelp}, optionKind);
517 break;
519 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, emptyDict(), HelpString{optionHelp}, optionKind);
520 break;
521 default:
522 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault, HelpString{optionHelp}, optionKind);
523 }
524 // Depending on the previous state, push options to the right place.
526 dataProcessors.back().options.push_back(*opt);
528 metadata.back().workflowOptions.push_back(*opt);
530 inputOptions.push_back(*opt);
532 outputOptions.push_back(*opt);
533 } else {
534 assert(false);
535 }
536 } else if (in(State::IN_METADATUM)) {
537 dataProcessors.back().metadata.push_back({metadatumKey, metadatumValue});
538 }
539 pop();
540 return true;
541 }
542
544 {
545 enter("START_ARRAY");
546 if (in(State::IN_WORKFLOW)) {
548 } else if (in(State::IN_INPUTS)) {
550 } else if (in(State::IN_INPUT_OPTIONS)) {
552 } else if (in(State::IN_OUTPUT_OPTIONS)) {
554 } else if (in(State::IN_OUTPUTS)) {
556 outputHasSubSpec = false;
557 } else if (in(State::IN_OPTIONS)) {
559 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
561 } else if (in(State::IN_LABELS)) {
563 } else if (in(State::IN_METADATA)) {
565 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
571 }
572 return true;
573 }
574
575 bool EndArray(SizeType count)
576 {
577 enter("END_ARRAY");
578 // Handle the case in which inputs / options / outputs are
579 // empty.
583 pop();
584 }
585 pop();
586 return true;
587 }
588
589 bool Key(const Ch* str, SizeType length, bool copy)
590 {
591 enter("KEY");
592 enter(str);
593 if (in(State::IN_INPUT) && strncmp(str, "binding", length) == 0) {
595 } else if (in(State::IN_INPUT) && strncmp(str, "origin", length) == 0) {
597 } else if (in(State::IN_INPUT) && strncmp(str, "description", length) == 0) {
599 } else if (in(State::IN_INPUT) && strncmp(str, "subspec", length) == 0) {
601 } else if (in(State::IN_INPUT) && strncmp(str, "originRef", length) == 0) {
603 } else if (in(State::IN_INPUT) && strncmp(str, "descriptionRef", length) == 0) {
605 } else if (in(State::IN_INPUT) && strncmp(str, "subspecRef", length) == 0) {
607 } else if (in(State::IN_INPUT) && strncmp(str, "matcher", length) == 0) {
608 // the outermost matcher is starting here
609 // we create a placeholder which is being updated later
610 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
612 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "matcher", length) == 0) {
613 // recursive matchers
614 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
616 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "operation", length) == 0) {
618 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "left", length) == 0) {
620 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "right", length) == 0) {
622 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "origin", length) == 0) {
624 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "origin", length) == 0) {
626 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "description", length) == 0) {
628 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "description", length) == 0) {
630 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspec", length) == 0) {
632 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspec", length) == 0) {
634 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "originRef", length) == 0) {
636 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "originRef", length) == 0) {
638 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
640 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
642 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
644 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
646 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "starttime", length) == 0) {
648 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "starttime", length) == 0) {
650 } else if (in(State::IN_INPUT) && strncmp(str, "lifetime", length) == 0) {
652 } else if (in(State::IN_INPUT) && strncmp(str, "starttime", length) == 0) {
654 } else if (in(State::IN_INPUT) && strncmp(str, "metadata", length) == 0) {
656 } else if (in(State::IN_OUTPUT) && strncmp(str, "binding", length) == 0) {
658 } else if (in(State::IN_OUTPUT) && strncmp(str, "origin", length) == 0) {
660 } else if (in(State::IN_OUTPUT) && strncmp(str, "description", length) == 0) {
662 } else if (in(State::IN_OUTPUT) && strncmp(str, "subspec", length) == 0) {
664 outputHasSubSpec = true;
665 } else if (in(State::IN_OUTPUT) && strncmp(str, "lifetime", length) == 0) {
667 } else if (in(State::IN_OUTPUT) && strncmp(str, "metadata", length) == 0) {
669 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "name", length) == 0) {
671 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "ranks", length) == 0) {
673 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "nSlots", length) == 0) {
675 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputTimeSliceId", length) == 0) {
677 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "maxInputTimeslices", length) == 0) {
679 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputs", length) == 0) {
681 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "outputs", length) == 0) {
683 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "options", length) == 0) {
685 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "labels", length) == 0) {
687 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "metadata", length) == 0) {
689 } else if (in(State::IN_METADATUM) && strncmp(str, "key", length) == 0) {
691 } else if (in(State::IN_METADATUM) && strncmp(str, "value", length) == 0) {
693 } else if (in(State::IN_EXECUTION) && strncmp(str, "workflow", length) == 0) {
695 } else if (in(State::IN_EXECUTION) && strncmp(str, "metadata", length) == 0) {
697 } else if (in(State::IN_OPTION) && strncmp(str, "name", length) == 0) {
699 } else if (in(State::IN_OPTION) && strncmp(str, "type", length) == 0) {
701 } else if (in(State::IN_OPTION) && strncmp(str, "defaultValue", length) == 0) {
703 } else if (in(State::IN_OPTION) && strncmp(str, "help", length) == 0) {
705 } else if (in(State::IN_OPTION) && strncmp(str, "kind", length) == 0) {
707 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "name", length) == 0) {
709 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
711 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
713 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
715 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "channels", length) == 0) {
717 } else if (in(State::IN_EXECUTION) && strncmp(str, "command", length) == 0) {
719 }
720 return true;
721 }
722
723 bool String(const Ch* str, SizeType length, bool copy)
724 {
725 enter("STRING");
726 enter(str);
727 auto s = std::string(str, length);
729 assert(dataProcessors.size());
730 dataProcessors.back().name = s;
732 assert(metadata.size());
733 metadata.back().name = s;
735 assert(metadata.size());
736 metadata.back().executable = s;
737 } else if (in(State::IN_INPUT_BINDING)) {
738 binding = s;
739 } else if (in(State::IN_INPUT_ORIGIN)) {
740 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
741 std::string v(s.c_str(), std::min(s.size(), 4UL));
743 } else if (in(State::IN_INPUT_DESCRIPTION)) {
744 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
745 std::string v(s.c_str(), std::min(s.size(), 16UL));
747 } else if (in(State::IN_INPUT_STARTTIME)) {
748 // we add StartTimeValueMatcher with ContextRef for starttime, no matter what
749 // has been in the configuration.
750 inputMatcherNodes.push_back(StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
752 // FIXME: need to implement operator>> to read the op parameter
753 DataDescriptorMatcher::Op op = DataDescriptorMatcher::Op::And;
754 if (s == "and") {
755 op = DataDescriptorMatcher::Op::And;
756 } else if (s == "or") {
757 op = DataDescriptorMatcher::Op::Or;
758 } else if (s == "xor") {
759 op = DataDescriptorMatcher::Op::Xor;
760 } else if (s == "just") {
761 op = DataDescriptorMatcher::Op::Just;
762 } else if (s == "not") {
763 op = DataDescriptorMatcher::Op::Not;
764 }
765 // FIXME: we could drop the placeholder which has been added when entering
766 // the states which can read key 'operation', but then we need to make sure
767 // that this key is always present
768 auto node = std::make_unique<DataDescriptorMatcher>(op, ConstantValueMatcher{false});
769 inputMatcherNodes.pop_back();
770 inputMatcherNodes.push_back(std::move(node));
771 } else if (in(State::IN_OUTPUT_BINDING)) {
772 binding = s;
773 } else if (in(State::IN_OUTPUT_ORIGIN)) {
774 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
775 } else if (in(State::IN_OUTPUT_DESCRIPTION)) {
776 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
777 } else if (in(State::IN_OPTION_NAME)) {
778 optionName = s;
779 } else if (in(State::IN_OPTION_TYPE)) {
780 optionType = (VariantType)std::stoi(s, nullptr);
781 } else if (in(State::IN_OPTION_KIND)) {
782 optionKind = (ConfigParamKind)std::stoi(s, nullptr);
783 } else if (in(State::IN_OPTION_DEFAULT)) {
784 optionDefault = s;
785 } else if (in(State::IN_OPTION_HELP)) {
786 optionHelp = s;
787 } else if (in(State::IN_LABEL)) {
788 dataProcessors.back().labels.push_back({s});
789 // This is in an array, so we do not actually want to
790 // exit from the state.
792 } else if (in(State::IN_METADATUM_KEY)) {
793 metadatumKey = s;
794 } else if (in(State::IN_METADATUM_VALUE)) {
795 metadatumValue = s;
797 metadata.back().cmdLineArgs.push_back(s);
798 // This is in an array, so we do not actually want to
799 // exit from the state.
802 metadata.back().channels.push_back(s);
803 // This is in an array, so we do not actually want to
804 // exit from the state.
806 } else if (in(State::IN_COMMAND)) {
807 command.merge({s});
808 } else {
809 std::stringstream errstr;
810 errstr << "No string handling for argument '" << std::string(str, length) << "' in state " << states.back() << std::endl;
811 throw std::runtime_error(errstr.str());
812 }
813 pop();
814 return true;
815 }
816
817 bool Uint(unsigned i)
818 {
819 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint(%d)", i);
821 subspec = i;
823 } else if (in(State::IN_INPUT_ORIGIN_REF)) {
824 ref = i;
827 ref = i;
829 } else if (in(State::IN_INPUT_SUBSPEC_REF)) {
830 ref = i;
832 } else if (in(State::IN_OUTPUT_SUBSPEC)) {
833 subspec = i;
834 } else if (in(State::IN_INPUT_LIFETIME)) {
836 } else if (in(State::IN_OUTPUT_LIFETIME)) {
838 } else if (in(State::IN_DATAPROCESSOR_RANK)) {
839 dataProcessors.back().rank = i;
841 dataProcessors.back().nSlots = i;
843 dataProcessors.back().inputTimeSliceId = i;
845 dataProcessors.back().maxInputTimeslices = i;
846 }
847 pop();
848 return true;
849 }
850
851 bool Int(int i)
852 {
853 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Int(%d)", i);
854 return true;
855 }
856 bool Uint64(uint64_t u)
857 {
858 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint64(%" PRIu64 ")", u);
859 return true;
860 }
861 bool Double(double d)
862 {
863 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Double(%f)", d);
864 return true;
865 }
866
867 void enter(char const* what)
868 {
869 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "ENTER: %s", what);
870 }
871
872 void push(State state)
873 {
874 debug.str("");
875 debug << state;
876 states.push_back(state);
877 O2_SIGNPOST_START(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "PUSH: %s", debug.str().c_str());
878 }
879
881 {
882 if (states.empty()) {
883 states.push_back(State::IN_ERROR);
884 return State::IN_ERROR;
885 }
886 auto result = states.back();
887 states.pop_back();
888 debug.str("");
889 debug << result;
890 if (!states.empty()) {
891 debug << " now in " << states.back();
892 }
893 O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
894 return result;
895 }
896 bool in(State o)
897 {
898 return states.back() == o;
899 }
900
902 {
903 assert(states.size() > 1);
904 return states[states.size() - 2] == o;
905 }
906
907 std::ostringstream debug;
908 std::vector<State> states;
909 std::string spec;
910 std::vector<DataProcessorSpec>& dataProcessors;
911 std::vector<DataProcessorInfo>& metadata;
913 std::vector<ConfigParamSpec> inputOptions;
914 std::vector<ConfigParamSpec> outputOptions;
915 std::string binding;
918 size_t subspec;
919 size_t ref;
921 std::string metadatumKey;
922 std::string metadatumValue;
923 std::string optionName;
925 std::string optionDefault;
926 std::string optionHelp;
929 std::vector<data_matcher::Node> inputMatcherNodes;
930};
931
933 std::vector<DataProcessorSpec>& workflow,
934 std::vector<DataProcessorInfo>& metadata,
935 CommandInfo& command)
936{
937 // Skip any line which does not start with '{'
938 // If we do not find a starting {, we simply assume that no workflow
939 // was actually passed on the PIPE.
940 // FIXME: not particularly resilient, but works for now.
941 // FIXME: this will fail if { is found at char 1024.
942 char buf[1024];
943 bool hasFatalImportError = false;
944 while (s.peek() != '{') {
945 if (s.eof()) {
946 return !hasFatalImportError;
947 }
948 if (s.fail() || s.bad()) {
949 throw std::runtime_error("Malformatted input workflow");
950 }
951 s.getline(buf, 1024, '\n');
952 // FairLogger messages (starting with [) simply get forwarded.
953 // Other messages we consider them as ERRORs since they
954 // were printed out without FairLogger.
955 if (buf[0] == '[') {
956 if (strncmp(buf, "[ERROR] invalid workflow in", strlen("[ERROR] invalid workflow in")) == 0 ||
957 strncmp(buf, "[ERROR] error while setting up workflow", strlen("[ERROR] error while setting up workflow")) == 0 ||
958 strncmp(buf, "[ERROR] error parsing options of", strlen("[ERROR] error parsing options of")) == 0) {
959 hasFatalImportError = true;
960 }
961 std::cout << buf << std::endl;
962 } else {
963 LOG(error) << buf;
964 }
965 }
966 if (hasFatalImportError) {
967 return false;
968 }
969 rapidjson::Reader reader;
970 rapidjson::IStreamWrapper isw(s);
971 WorkflowImporter importer{workflow, metadata, command};
972 bool ok = reader.Parse(isw, importer);
973 if (ok == false) {
974 if (s.eof()) {
975 throw std::runtime_error("Error while parsing serialised workflow");
976 }
977 // clean up possible leftovers at the end of the input stream, e.g. [DEBUG] message from destructors
978 O2_SIGNPOST_ID_GENERATE(sid, post_workflow_importer);
979 while (true) {
980 s.getline(buf, 1024, '\n');
981 if (s.eof()) {
982 break;
983 }
984 O2_SIGNPOST_EVENT_EMIT(post_workflow_importer, sid, "post import", "Following leftover line found in input stream after parsing workflow: %{public}s", buf);
985 }
986 }
987 return true;
988}
989
991 std::vector<DataProcessorSpec> const& workflow,
992 std::vector<DataProcessorInfo> const& metadata,
993 CommandInfo const& commandInfo)
994{
995 rapidjson::OStreamWrapper osw(out);
996 rapidjson::PrettyWriter<rapidjson::OStreamWrapper> w(osw);
997
998 // handlers for serialization of InputSpec matchers
999 auto edgeWalker = overloaded{
1000 [&w](EdgeActions::EnterNode action) {
1001 w.Key("matcher");
1002 w.StartObject();
1003 w.Key("operation");
1004 std::stringstream ss;
1005 ss << action.node->getOp();
1006 w.String(ss.str().c_str());
1007 if (action.node->getOp() == DataDescriptorMatcher::Op::Just ||
1008 action.node->getOp() == DataDescriptorMatcher::Op::Not) {
1009 return ChildAction::VisitLeft;
1010 }
1011 return ChildAction::VisitBoth;
1012 },
1014 w.Key("left");
1015 w.StartObject();
1016 },
1018 w.EndObject();
1019 },
1021 w.Key("right");
1022 w.StartObject();
1023 },
1025 w.EndObject();
1026 },
1028 w.EndObject();
1029 },
1030 [&w](auto) {}};
1031 auto leafWalker = overloaded{
1032 [&w](OriginValueMatcher const& origin) {
1033 origin.visit(overloaded{
1034 [&w](ContextRef const& ref) {
1035 w.Key("originRef");
1036 w.Uint64(ref.index);
1037 },
1038 [&w](auto const& value) {
1039 w.Key("origin");
1040 std::stringstream ss;
1041 ss << value;
1042 w.String(ss.str().c_str());
1043 }});
1044 },
1046 description.visit(overloaded{
1047 [&w](ContextRef const& ref) {
1048 w.Key("descriptionRef");
1049 w.Uint64(ref.index);
1050 },
1051 [&w](auto const& value) {
1052 w.Key("description");
1053 std::stringstream ss;
1054 ss << value;
1055 w.String(ss.str().c_str());
1056 }});
1057 },
1058 [&w](SubSpecificationTypeValueMatcher const& subspec) {
1059 subspec.visit(overloaded{
1060 [&w](ContextRef const& ref) {
1061 w.Key("subspecRef");
1062 w.Uint64(ref.index);
1063 },
1064 [&w](auto const& value) {
1065 w.Key("subspec");
1066 std::stringstream ss;
1067 ss << value;
1068 w.Uint64(std::stoul(ss.str()));
1069 }});
1070 },
1071 [&w](StartTimeValueMatcher const& startTime) {
1072 w.Key("starttime");
1073 std::stringstream ss;
1074 ss << startTime;
1075 w.String(ss.str().c_str());
1076 },
1077 [&w](ConstantValueMatcher const& constant) {},
1078 [&w](auto t) {}};
1079
1080 w.StartObject();
1081 w.Key("workflow");
1082 w.StartArray();
1083
1084 for (auto& processor : workflow) {
1085 if (processor.name.rfind("internal-dpl", 0) == 0) {
1086 continue;
1087 }
1088 w.StartObject();
1089 w.Key("name");
1090 w.String(processor.name.c_str());
1091
1092 w.Key("inputs");
1093 w.StartArray();
1094 for (auto const& input : processor.inputs) {
1098 w.StartObject();
1099 w.Key("binding");
1100 w.String(input.binding.c_str());
1101 if (auto const* concrete = std::get_if<ConcreteDataMatcher>(&input.matcher)) {
1102 w.Key("origin");
1103 w.String(concrete->origin.str, strnlen(concrete->origin.str, 4));
1104 w.Key("description");
1105 w.String(concrete->description.str, strnlen(concrete->description.str, 16));
1106 w.Key("subspec");
1107 w.Uint64(concrete->subSpec);
1108 // auto tmp = DataSpecUtils::dataDescriptorMatcherFrom(*concrete);
1109 // DataMatcherWalker::walk(tmp,
1110 // edgeWalker,
1111 // leafWalker);
1112 } else if (auto const* matcher = std::get_if<DataDescriptorMatcher>(&input.matcher)) {
1113 DataMatcherWalker::walk(*matcher,
1114 edgeWalker,
1115 leafWalker);
1116 }
1117 w.Key("lifetime");
1118 w.Uint((int)input.lifetime);
1119 if (input.metadata.empty() == false) {
1120 w.Key("metadata");
1121 w.StartArray();
1122 for (auto& metadata : input.metadata) {
1123 w.StartObject();
1124 w.Key("name");
1125 w.String(metadata.name.c_str());
1126 auto s = std::to_string(int(metadata.type));
1127 w.Key("type");
1128 w.String(s.c_str());
1129 std::ostringstream oss;
1130 oss << metadata.defaultValue;
1131 w.Key("defaultValue");
1132 w.String(oss.str().c_str());
1133 w.Key("help");
1134 w.String(metadata.help.c_str());
1135 w.EndObject();
1136 }
1137 w.EndArray();
1138 }
1139 w.EndObject();
1140 }
1141 w.EndArray();
1142
1143 w.Key("outputs");
1144 w.StartArray();
1145 for (auto& output : processor.outputs) {
1146 w.StartObject();
1147 w.Key("binding");
1148 if (output.binding.value.empty()) {
1149 auto autogenerated = DataSpecUtils::describe(output);
1150 w.String(autogenerated.c_str());
1151 } else {
1152 w.String(output.binding.value.c_str());
1153 }
1155 w.Key("origin");
1156 w.String(dataType.origin.str, strnlen(dataType.origin.str, 4));
1157 w.Key("description");
1158 w.String(dataType.description.str, strnlen(dataType.description.str, 16));
1159 // FIXME: this will have to change once we introduce wildcards for
1160 // OutputSpec
1162 if (subSpec.has_value()) {
1163 w.Key("subspec");
1164 w.Uint64(*subSpec);
1165 }
1166 w.Key("lifetime");
1167 w.Uint((int)output.lifetime);
1168 if (output.metadata.empty() == false) {
1169 w.Key("metadata");
1170 w.StartArray();
1171 for (auto& metadata : output.metadata) {
1172 w.StartObject();
1173 w.Key("name");
1174 w.String(metadata.name.c_str());
1175 auto s = std::to_string(int(metadata.type));
1176 w.Key("type");
1177 w.String(s.c_str());
1178 std::ostringstream oss;
1179 oss << metadata.defaultValue;
1180 w.Key("defaultValue");
1181 w.String(oss.str().c_str());
1182 w.Key("help");
1183 w.String(metadata.help.c_str());
1184 w.EndObject();
1185 }
1186 w.EndArray();
1187 }
1188 w.EndObject();
1189 }
1190 w.EndArray();
1191
1192 w.Key("options");
1193 w.StartArray();
1194 for (auto& option : processor.options) {
1195 if (option.name == "start-value-enumeration" || option.name == "end-value-enumeration" || option.name == "step-value-enumeration" || option.name == "orbit-offset-enumeration" || option.name == "orbit-multiplier-enumeration") {
1196 continue;
1197 }
1198 w.StartObject();
1199 w.Key("name");
1200 w.String(option.name.c_str());
1201 auto s = std::to_string(int(option.type));
1202 w.Key("type");
1203 w.String(s.c_str());
1204 std::ostringstream oss;
1205 switch (option.type) {
1218 case VariantType::Dict:
1219 VariantJSONHelpers::write(oss, option.defaultValue);
1220 break;
1221 default:
1222 oss << option.defaultValue;
1223 break;
1224 }
1225 w.Key("defaultValue");
1226 w.String(oss.str().c_str());
1227 w.Key("help");
1228 w.String(option.help.c_str());
1229 w.Key("kind");
1230 w.String(std::to_string((int)option.kind).c_str());
1231 w.EndObject();
1232 }
1233 w.EndArray();
1234 w.Key("labels");
1235 w.StartArray();
1236 for (auto& label : processor.labels) {
1237 w.String(label.value.c_str());
1238 }
1239 w.EndArray();
1240 w.Key("metadata");
1241 w.StartArray();
1242 for (auto& metadatum : processor.metadata) {
1243 w.StartObject();
1244 w.Key("key");
1245 w.String(metadatum.key.c_str());
1246 w.Key("value");
1247 w.String(metadatum.value.c_str());
1248 w.EndObject();
1249 }
1250 w.EndArray();
1251 w.Key("rank");
1252 w.Int(processor.rank);
1253 w.Key("nSlots");
1254 w.Int(processor.nSlots);
1255 w.Key("inputTimeSliceId");
1256 w.Int(processor.inputTimeSliceId);
1257 w.Key("maxInputTimeslices");
1258 w.Int(processor.maxInputTimeslices);
1259
1260 w.EndObject();
1261 }
1262 w.EndArray();
1263
1264 w.Key("metadata");
1265 w.StartArray();
1266 for (auto& info : metadata) {
1267 w.StartObject();
1268 w.Key("name");
1269 w.String(info.name.c_str());
1270 w.Key("executable");
1271 w.String(info.executable.c_str());
1272 w.Key("cmdLineArgs");
1273 w.StartArray();
1274 for (auto& arg : info.cmdLineArgs) {
1275 w.String(arg.c_str());
1276 }
1277 w.EndArray();
1278 w.Key("workflowOptions");
1279 w.StartArray();
1280 for (auto& option : info.workflowOptions) {
1281 w.StartObject();
1282 w.Key("name");
1283 w.String(option.name.c_str());
1284 auto s = std::to_string(int(option.type));
1285 w.Key("type");
1286 w.String(s.c_str());
1287 std::ostringstream oss;
1288 oss << option.defaultValue;
1289 w.Key("defaultValue");
1290 w.String(oss.str().c_str());
1291 w.Key("help");
1292 w.String(option.help.c_str());
1293 w.EndObject();
1294 }
1295 w.EndArray();
1296 w.Key("channels");
1297 w.StartArray();
1298 for (auto& channel : info.channels) {
1299 w.String(channel.c_str());
1300 }
1301 w.EndArray();
1302 w.EndObject();
1303 }
1304 w.EndArray();
1305
1306 w.Key("command");
1307 w.String(commandInfo.command.c_str());
1308
1309 w.EndObject();
1310}
1311
1312} // namespace o2::framework
header::DataOrigin origin
header::DataDescription description
benchmark::State & state
std::unique_ptr< expressions::Node > node
int32_t i
uint32_t op
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
Something which can be matched against a header::DataDescription.
Something which can be matched against a header::DataOrigin.
Matcher on actual time, as reported in the DataProcessingHeader.
Something which can be matched against a header::SubSpecificationType.
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
GLuint * states
Definition glcorearb.h:4932
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
Variant emptyDict()
Definition Variant.h:428
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void merge(CommandInfo const &other)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static std::optional< framework::ConcreteDataMatcher > optionalConcreteDataMatcherFrom(data_matcher::DataDescriptorMatcher const &matcher)
static void write(std::ostream &o, Variant const &v)
std::vector< DataProcessorInfo > & metadata
friend std::ostream & operator<<(std::ostream &s, State state)
std::vector< ConfigParamSpec > outputOptions
bool Key(const Ch *str, SizeType length, bool copy)
std::vector< DataProcessorSpec > & dataProcessors
std::vector< ConfigParamSpec > inputOptions
bool String(const Ch *str, SizeType length, bool copy)
WorkflowImporter(std::vector< DataProcessorSpec > &o, std::vector< DataProcessorInfo > &m, CommandInfo &c)
std::vector< data_matcher::Node > inputMatcherNodes
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)
static bool import(std::istream &s, std::vector< DataProcessorSpec > &workflow, std::vector< DataProcessorInfo > &metadata, CommandInfo &command)
A typesafe reference to an element of the context.
static void walk(DataDescriptorMatcher const &top, EDGEWALKER edgeWalker, LEAFWALKER leafWalker)
From https://en.cppreference.com/w/cpp/utility/variant/visit.
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str