Project
Loading...
Searching...
No Matches
WorkflowSerializationHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
20#include "Framework/Logger.h"
21#include "Framework/Signpost.h"
22
23#include <rapidjson/reader.h>
24#include <rapidjson/prettywriter.h>
25#include <rapidjson/istreamwrapper.h>
26#include <rapidjson/ostreamwrapper.h>
27#include <iostream>
28#include <algorithm>
29#include <memory>
30
31O2_DECLARE_DYNAMIC_LOG(workflow_importer);
32O2_DECLARE_DYNAMIC_LOG(post_workflow_importer);
33
34namespace o2::framework
35{
36
37using namespace rapidjson;
38using namespace o2::framework::data_matcher;
39
40struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, WorkflowImporter> {
101
102 friend std::ostream& operator<<(std::ostream& s, State state)
103 {
104 switch (state) {
105 case State::IN_START:
106 s << "IN_START";
107 break;
109 s << "IN_EXECUTION";
110 break;
112 s << "IN_WORKFLOW";
113 break;
115 s << "IN_COMMAND";
116 break;
118 s << "IN_DATAPROCESSORS";
119 break;
121 s << "IN_DATAPROCESSOR";
122 break;
124 s << "IN_DATAPROCESSOR_NAME";
125 break;
127 s << "IN_DATAPROCESSOR_RANK";
128 break;
130 s << "IN_DATAPROCESSOR_N_SLOTS";
131 break;
133 s << "IN_DATAPROCESSOR_TIMESLICE_ID";
134 break;
136 s << "IN_DATAPROCESSOR_MAX_TIMESLICES";
137 break;
138 case State::IN_INPUTS:
139 s << "IN_INPUTS";
140 break;
142 s << "IN_OUTPUTS";
143 break;
145 s << "IN_OPTIONS";
146 break;
147 case State::IN_LABELS:
148 s << "IN_LABELS";
149 break;
151 s << "IN_METADATA";
152 break;
154 s << "IN_WORKFLOW_OPTIONS";
155 break;
156 case State::IN_INPUT:
157 s << "IN_INPUT";
158 break;
160 s << "IN_INPUT_BINDING";
161 break;
163 s << "IN_INPUT_ORIGIN";
164 break;
166 s << "IN_INPUT_DESCRIPTION";
167 break;
169 s << "IN_INPUT_SUBSPEC";
170 break;
172 s << "IN_INPUT_ORIGIN_REF";
173 break;
175 s << "IN_INPUT_DESCRIPTION_REF";
176 break;
178 s << "IN_INPUT_SUBSPEC_REF";
179 break;
181 s << "IN_INPUT_MATCHER";
182 break;
184 s << "IN_INPUT_MATCHER_OPERATION";
185 break;
187 s << "IN_INPUT_LEFT_MATCHER";
188 break;
190 s << "IN_INPUT_RIGHT_MATCHER";
191 break;
193 s << "IN_INPUT_LIFETIME";
194 break;
196 s << "IN_INPUT_STARTTIME";
197 break;
199 s << "IN_INPUT_OPTIONS";
200 break;
201 case State::IN_OUTPUT:
202 s << "IN_OUTPUT";
203 break;
205 s << "IN_OUTPUT_BINDING";
206 break;
208 s << "IN_OUTPUT_ORIGIN";
209 break;
211 s << "IN_OUTPUT_DESCRIPTION";
212 break;
214 s << "IN_OUTPUT_SUBSPEC";
215 break;
217 s << "IN_OUTPUT_LIFETIME";
218 break;
220 s << "IN_OUTPUT_OPTIONS";
221 break;
222 case State::IN_OPTION:
223 s << "IN_OPTION";
224 break;
226 s << "IN_OPTION_NAME";
227 break;
229 s << "IN_OPTION_TYPE";
230 break;
232 s << "IN_OPTION_DEFAULT";
233 break;
235 s << "IN_OPTION_HELP";
236 break;
238 s << "IN_OPTION_KIND";
239 break;
240 case State::IN_LABEL:
241 s << "IN_LABEL";
242 break;
244 s << "IN_METADATUM";
245 break;
247 s << "IN_METADATUM_KEY";
248 break;
250 s << "IN_METADATUM_VALUE";
251 break;
252 case State::IN_ERROR:
253 s << "IN_ERROR";
254 break;
256 s << "IN_DATAPROCESSOR_INFOS";
257 break;
259 s << "IN_DATAPROCESSOR_INFO";
260 break;
262 s << "IN_DATAPROCESSOR_INFO_NAME";
263 break;
265 s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
266 break;
268 s << "IN_DATAPROCESSOR_INFO_ARGS";
269 break;
271 s << "IN_DATAPROCESSOR_INFO_ARG";
272 break;
274 s << "IN_DATAPROCESSOR_INFO_CHANNELS";
275 break;
277 s << "IN_DATAPROCESSOR_INFO_CHANNEL";
278 break;
279 }
280 return s;
281 }
282
283 WorkflowImporter(std::vector<DataProcessorSpec>& o,
284 std::vector<DataProcessorInfo>& m,
285 CommandInfo& c)
286 : states{},
288 metadata{m},
289 command{c}
290 {
292 }
293
295 {
296 enter("START_OBJECT");
297 if (in(State::IN_START)) {
299 } else if (in(State::IN_DATAPROCESSORS)) {
302 } else if (in(State::IN_DATAPROCESSOR)) {
304 } else if (in(State::IN_INPUTS)) {
306 inputMatcherNodes.clear();
307 } else if (in(State::IN_INPUT_MATCHER)) {
308 // start a new embedded matcher
309 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
310 // this is a matcher leaf, i.e. last matcher of a branch
311 // will be merged into the parent matcher
312 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
313 // this is a matcher leaf, i.e. last matcher of a branch
314 // will be merged into the parent matcher
315 } else if (in(State::IN_OUTPUTS)) {
317 outputHasSubSpec = false;
318 } else if (in(State::IN_OPTIONS)) {
320 } else if (in(State::IN_INPUT_OPTIONS)) {
322 } else if (in(State::IN_OUTPUT_OPTIONS)) {
324 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
326 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
328 metadata.push_back(DataProcessorInfo{});
329 } else if (in(State::IN_DATAPROCESSOR_INFO)) {
330 metadata.push_back(DataProcessorInfo{});
331 } else if (in(State::IN_METADATA)) {
333 metadatumKey.clear();
334 metadatumValue.clear();
335 } else if (in(State::IN_COMMAND)) {
337 }
338 return true;
339 }
340
341 bool EndObject(SizeType memberCount)
342 {
343 enter("END_OBJECT");
344 if (in(State::IN_INPUT)) {
345 auto buildMatcher = [](auto& nodes) -> std::unique_ptr<DataDescriptorMatcher> {
346 auto lastMatcher =
347 std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::Just,
348 StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
349 for (size_t ni = 0, ne = nodes.size(); ni < ne; ++ni) {
350 auto& node = nodes[nodes.size() - 1 - ni];
351 auto tmp = std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And,
352 std::move(node),
353 std::move(lastMatcher));
354 assert(lastMatcher.get() == nullptr);
355 lastMatcher = std::move(tmp);
356 }
357 return lastMatcher;
358 };
359
360 std::unique_ptr<DataDescriptorMatcher> matcher;
361 if (auto* pval = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[0])) {
362 assert(inputMatcherNodes.size() == 1);
363 matcher = std::move(*pval);
364 } else {
365 matcher = buildMatcher(inputMatcherNodes);
366 }
367 auto concrete = DataSpecUtils::optionalConcreteDataMatcherFrom(*matcher);
368 if (concrete.has_value()) {
369 // the matcher is fully qualified with unique parameters so we add ConcreteDataMatcher
370 dataProcessors.back().inputs.push_back(InputSpec({binding}, (*concrete).origin, (*concrete).description, (*concrete).subSpec, lifetime, inputOptions));
371 } else {
372 dataProcessors.back().inputs.push_back(InputSpec({binding}, std::move(*matcher), lifetime, inputOptions));
373 }
374 inputMatcherNodes.clear();
375 inputOptions.clear();
376
377 } else if (in(State::IN_INPUT_MATCHER) && inputMatcherNodes.size() > 1) {
378 data_matcher::Node child = std::move(inputMatcherNodes.back());
379 inputMatcherNodes.pop_back();
380 auto* matcher = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&child);
381 assert(matcher != nullptr);
382 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
383 assert(parent != nullptr);
384 std::unique_ptr<DataDescriptorMatcher> node;
385 auto mergeDown = [&node, &parent, &child]() -> bool {
386 // FIXME: do we need a dedicated default state, or can we simply use ConstantValueMatcher
387 if (auto* pval1 = std::get_if<ConstantValueMatcher>(&((*parent)->getLeft()))) {
388 if (*pval1 == ConstantValueMatcher{false}) {
389 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
390 std::move(child),
391 std::move((*parent)->getRight()));
392 return true;
393 }
394 }
395 if (auto* pval2 = std::get_if<ConstantValueMatcher>(&((*parent)->getRight()))) {
396 if (*pval2 == ConstantValueMatcher{false}) {
397 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
398 std::move((*parent)->getLeft()),
399 std::move(child));
400 return true;
401 }
402 }
403 return false;
404 };
405 if (!mergeDown()) {
406 states.push_back(State::IN_ERROR);
407 }
408 inputMatcherNodes.pop_back();
409 inputMatcherNodes.push_back(std::move(node));
410 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
411 assert(inputMatcherNodes.size() >= 2);
412 size_t nMatchers = inputMatcherNodes.size();
413 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[nMatchers - 2]);
414 assert(parent != nullptr);
415 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
416 std::move(inputMatcherNodes[nMatchers - 1]),
417 std::move((*parent)->getRight()));
418 inputMatcherNodes.pop_back();
419 inputMatcherNodes.pop_back();
420 inputMatcherNodes.push_back(std::move(node));
421 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
422 data_matcher::Node child = std::move(inputMatcherNodes.back());
423 inputMatcherNodes.pop_back();
424 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
425 assert(parent != nullptr);
426 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
427 std::move((*parent)->getLeft()),
428 std::move(child));
429 inputMatcherNodes.pop_back();
430 inputMatcherNodes.push_back(std::move(node));
431 } else if (in(State::IN_OUTPUT)) {
432 if (outputHasSubSpec) {
433 dataProcessors.back().outputs.push_back(OutputSpec({binding}, origin, description, subspec, lifetime));
434 } else {
435 dataProcessors.back().outputs.push_back(OutputSpec({binding}, {origin, description}, lifetime));
436 }
437 outputHasSubSpec = false;
438 } else if (in(State::IN_OPTION)) {
439 std::unique_ptr<ConfigParamSpec> opt{nullptr};
440
441 using HelpString = ConfigParamSpec::HelpString;
442 std::stringstream is;
443 is.str(optionDefault);
444 switch (optionType) {
446 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault.c_str(), HelpString{optionHelp}, optionKind);
447 break;
448 case VariantType::Int:
449 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
450 break;
452 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
453 break;
455 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
456 break;
458 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
459 break;
461 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
462 break;
464 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint32_t>(std::stoul(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
465 break;
467 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoul(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
468 break;
470 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stol(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
471 break;
473 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stof(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
474 break;
476 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stod(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
477 break;
479 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, (bool)std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
480 break;
482 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayInt>(is), HelpString{optionHelp}, optionKind);
483 break;
485 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayFloat>(is), HelpString{optionHelp}, optionKind);
486 break;
488 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayDouble>(is), HelpString{optionHelp}, optionKind);
489 break;
490 // case VariantType::ArrayBool:
491 // opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayBool>(is), HelpString{optionHelp}, optionKind);
492 // break;
494 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayString>(is), HelpString{optionHelp}, optionKind);
495 break;
497 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DInt>(is), HelpString{optionHelp}, optionKind);
498 break;
500 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DFloat>(is), HelpString{optionHelp}, optionKind);
501 break;
503 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DDouble>(is), HelpString{optionHelp}, optionKind);
504 break;
506 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayInt>(is), HelpString{optionHelp}, optionKind);
507 break;
509 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayFloat>(is), HelpString{optionHelp}, optionKind);
510 break;
512 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayDouble>(is), HelpString{optionHelp}, optionKind);
513 break;
515 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayString>(is), HelpString{optionHelp}, optionKind);
516 break;
518 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, emptyDict(), HelpString{optionHelp}, optionKind);
519 break;
520 default:
521 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault, HelpString{optionHelp}, optionKind);
522 }
523 // Depending on the previous state, push options to the right place.
525 dataProcessors.back().options.push_back(*opt);
527 metadata.back().workflowOptions.push_back(*opt);
529 inputOptions.push_back(*opt);
531 outputOptions.push_back(*opt);
532 } else {
533 assert(false);
534 }
535 } else if (in(State::IN_METADATUM)) {
536 dataProcessors.back().metadata.push_back({metadatumKey, metadatumValue});
537 }
538 pop();
539 return true;
540 }
541
543 {
544 enter("START_ARRAY");
545 if (in(State::IN_WORKFLOW)) {
547 } else if (in(State::IN_INPUTS)) {
549 } else if (in(State::IN_INPUT_OPTIONS)) {
551 } else if (in(State::IN_OUTPUT_OPTIONS)) {
553 } else if (in(State::IN_OUTPUTS)) {
555 outputHasSubSpec = false;
556 } else if (in(State::IN_OPTIONS)) {
558 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
560 } else if (in(State::IN_LABELS)) {
562 } else if (in(State::IN_METADATA)) {
564 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
570 }
571 return true;
572 }
573
574 bool EndArray(SizeType count)
575 {
576 enter("END_ARRAY");
577 // Handle the case in which inputs / options / outputs are
578 // empty.
582 pop();
583 }
584 pop();
585 return true;
586 }
587
588 bool Key(const Ch* str, SizeType length, bool copy)
589 {
590 enter("KEY");
591 enter(str);
592 if (in(State::IN_INPUT) && strncmp(str, "binding", length) == 0) {
594 } else if (in(State::IN_INPUT) && strncmp(str, "origin", length) == 0) {
596 } else if (in(State::IN_INPUT) && strncmp(str, "description", length) == 0) {
598 } else if (in(State::IN_INPUT) && strncmp(str, "subspec", length) == 0) {
600 } else if (in(State::IN_INPUT) && strncmp(str, "originRef", length) == 0) {
602 } else if (in(State::IN_INPUT) && strncmp(str, "descriptionRef", length) == 0) {
604 } else if (in(State::IN_INPUT) && strncmp(str, "subspecRef", length) == 0) {
606 } else if (in(State::IN_INPUT) && strncmp(str, "matcher", length) == 0) {
607 // the outermost matcher is starting here
608 // we create a placeholder which is being updated later
609 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
611 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "matcher", length) == 0) {
612 // recursive matchers
613 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
615 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "operation", length) == 0) {
617 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "left", length) == 0) {
619 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "right", length) == 0) {
621 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "origin", length) == 0) {
623 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "origin", length) == 0) {
625 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "description", length) == 0) {
627 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "description", length) == 0) {
629 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspec", length) == 0) {
631 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspec", length) == 0) {
633 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "originRef", length) == 0) {
635 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "originRef", length) == 0) {
637 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
639 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
641 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
643 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
645 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "starttime", length) == 0) {
647 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "starttime", length) == 0) {
649 } else if (in(State::IN_INPUT) && strncmp(str, "lifetime", length) == 0) {
651 } else if (in(State::IN_INPUT) && strncmp(str, "starttime", length) == 0) {
653 } else if (in(State::IN_INPUT) && strncmp(str, "metadata", length) == 0) {
655 } else if (in(State::IN_OUTPUT) && strncmp(str, "binding", length) == 0) {
657 } else if (in(State::IN_OUTPUT) && strncmp(str, "origin", length) == 0) {
659 } else if (in(State::IN_OUTPUT) && strncmp(str, "description", length) == 0) {
661 } else if (in(State::IN_OUTPUT) && strncmp(str, "subspec", length) == 0) {
663 outputHasSubSpec = true;
664 } else if (in(State::IN_OUTPUT) && strncmp(str, "lifetime", length) == 0) {
666 } else if (in(State::IN_OUTPUT) && strncmp(str, "metadata", length) == 0) {
668 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "name", length) == 0) {
670 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "ranks", length) == 0) {
672 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "nSlots", length) == 0) {
674 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputTimeSliceId", length) == 0) {
676 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "maxInputTimeslices", length) == 0) {
678 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputs", length) == 0) {
680 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "outputs", length) == 0) {
682 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "options", length) == 0) {
684 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "labels", length) == 0) {
686 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "metadata", length) == 0) {
688 } else if (in(State::IN_METADATUM) && strncmp(str, "key", length) == 0) {
690 } else if (in(State::IN_METADATUM) && strncmp(str, "value", length) == 0) {
692 } else if (in(State::IN_EXECUTION) && strncmp(str, "workflow", length) == 0) {
694 } else if (in(State::IN_EXECUTION) && strncmp(str, "metadata", length) == 0) {
696 } else if (in(State::IN_OPTION) && strncmp(str, "name", length) == 0) {
698 } else if (in(State::IN_OPTION) && strncmp(str, "type", length) == 0) {
700 } else if (in(State::IN_OPTION) && strncmp(str, "defaultValue", length) == 0) {
702 } else if (in(State::IN_OPTION) && strncmp(str, "help", length) == 0) {
704 } else if (in(State::IN_OPTION) && strncmp(str, "kind", length) == 0) {
706 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "name", length) == 0) {
708 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
710 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
712 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
714 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "channels", length) == 0) {
716 } else if (in(State::IN_EXECUTION) && strncmp(str, "command", length) == 0) {
718 }
719 return true;
720 }
721
722 bool String(const Ch* str, SizeType length, bool copy)
723 {
724 enter("STRING");
725 enter(str);
726 auto s = std::string(str, length);
728 assert(dataProcessors.size());
729 dataProcessors.back().name = s;
731 assert(metadata.size());
732 metadata.back().name = s;
734 assert(metadata.size());
735 metadata.back().executable = s;
736 } else if (in(State::IN_INPUT_BINDING)) {
737 binding = s;
738 } else if (in(State::IN_INPUT_ORIGIN)) {
739 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
740 std::string v(s.c_str(), std::min(s.size(), 4UL));
742 } else if (in(State::IN_INPUT_DESCRIPTION)) {
743 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
744 std::string v(s.c_str(), std::min(s.size(), 16UL));
746 } else if (in(State::IN_INPUT_STARTTIME)) {
747 // we add StartTimeValueMatcher with ContextRef for starttime, no matter what
748 // has been in the configuration.
749 inputMatcherNodes.push_back(StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
751 // FIXME: need to implement operator>> to read the op parameter
752 DataDescriptorMatcher::Op op = DataDescriptorMatcher::Op::And;
753 if (s == "and") {
754 op = DataDescriptorMatcher::Op::And;
755 } else if (s == "or") {
756 op = DataDescriptorMatcher::Op::Or;
757 } else if (s == "xor") {
758 op = DataDescriptorMatcher::Op::Xor;
759 } else if (s == "just") {
760 op = DataDescriptorMatcher::Op::Just;
761 } else if (s == "not") {
762 op = DataDescriptorMatcher::Op::Not;
763 }
764 // FIXME: we could drop the placeholder which has been added when entering
765 // the states which can read key 'operation', but then we need to make sure
766 // that this key is always present
767 auto node = std::make_unique<DataDescriptorMatcher>(op, ConstantValueMatcher{false});
768 inputMatcherNodes.pop_back();
769 inputMatcherNodes.push_back(std::move(node));
770 } else if (in(State::IN_OUTPUT_BINDING)) {
771 binding = s;
772 } else if (in(State::IN_OUTPUT_ORIGIN)) {
773 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
774 } else if (in(State::IN_OUTPUT_DESCRIPTION)) {
775 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
776 } else if (in(State::IN_OPTION_NAME)) {
777 optionName = s;
778 } else if (in(State::IN_OPTION_TYPE)) {
779 optionType = (VariantType)std::stoi(s, nullptr);
780 } else if (in(State::IN_OPTION_KIND)) {
781 optionKind = (ConfigParamKind)std::stoi(s, nullptr);
782 } else if (in(State::IN_OPTION_DEFAULT)) {
783 optionDefault = s;
784 } else if (in(State::IN_OPTION_HELP)) {
785 optionHelp = s;
786 } else if (in(State::IN_LABEL)) {
787 dataProcessors.back().labels.push_back({s});
788 // This is in an array, so we do not actually want to
789 // exit from the state.
791 } else if (in(State::IN_METADATUM_KEY)) {
792 metadatumKey = s;
793 } else if (in(State::IN_METADATUM_VALUE)) {
794 metadatumValue = s;
796 metadata.back().cmdLineArgs.push_back(s);
797 // This is in an array, so we do not actually want to
798 // exit from the state.
801 metadata.back().channels.push_back(s);
802 // This is in an array, so we do not actually want to
803 // exit from the state.
805 } else if (in(State::IN_COMMAND)) {
806 command.merge({s});
807 } else {
808 std::stringstream errstr;
809 errstr << "No string handling for argument '" << std::string(str, length) << "' in state " << states.back() << std::endl;
810 throw std::runtime_error(errstr.str());
811 }
812 pop();
813 return true;
814 }
815
816 bool Uint(unsigned i)
817 {
818 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint(%d)", i);
820 subspec = i;
822 } else if (in(State::IN_INPUT_ORIGIN_REF)) {
823 ref = i;
826 ref = i;
828 } else if (in(State::IN_INPUT_SUBSPEC_REF)) {
829 ref = i;
831 } else if (in(State::IN_OUTPUT_SUBSPEC)) {
832 subspec = i;
833 } else if (in(State::IN_INPUT_LIFETIME)) {
835 } else if (in(State::IN_OUTPUT_LIFETIME)) {
837 } else if (in(State::IN_DATAPROCESSOR_RANK)) {
838 dataProcessors.back().rank = i;
840 dataProcessors.back().nSlots = i;
842 dataProcessors.back().inputTimeSliceId = i;
844 dataProcessors.back().maxInputTimeslices = i;
845 }
846 pop();
847 return true;
848 }
849
850 bool Int(int i)
851 {
852 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Int(%d)", i);
853 return true;
854 }
855 bool Uint64(uint64_t u)
856 {
857 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint64(%" PRIu64 ")", u);
858 return true;
859 }
860 bool Double(double d)
861 {
862 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Double(%f)", d);
863 return true;
864 }
865
866 void enter(char const* what)
867 {
868 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "ENTER: %s", what);
869 }
870
871 void push(State state)
872 {
873 debug.str("");
874 debug << state;
875 states.push_back(state);
876 O2_SIGNPOST_START(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "PUSH: %s", debug.str().c_str());
877 }
878
880 {
881 if (states.empty()) {
882 states.push_back(State::IN_ERROR);
883 return State::IN_ERROR;
884 }
885 auto result = states.back();
886 states.pop_back();
887 debug.str("");
888 debug << result;
889 if (!states.empty()) {
890 debug << " now in " << states.back();
891 }
892 O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
893 return result;
894 }
895 bool in(State o)
896 {
897 return states.back() == o;
898 }
899
901 {
902 assert(states.size() > 1);
903 return states[states.size() - 2] == o;
904 }
905
906 std::ostringstream debug;
907 std::vector<State> states;
908 std::string spec;
909 std::vector<DataProcessorSpec>& dataProcessors;
910 std::vector<DataProcessorInfo>& metadata;
912 std::vector<ConfigParamSpec> inputOptions;
913 std::vector<ConfigParamSpec> outputOptions;
914 std::string binding;
917 size_t subspec;
918 size_t ref;
920 std::string metadatumKey;
921 std::string metadatumValue;
922 std::string optionName;
924 std::string optionDefault;
925 std::string optionHelp;
928 std::vector<data_matcher::Node> inputMatcherNodes;
929};
930
932 std::vector<DataProcessorSpec>& workflow,
933 std::vector<DataProcessorInfo>& metadata,
934 CommandInfo& command)
935{
936 // Skip any line which does not start with '{'
937 // If we do not find a starting {, we simply assume that no workflow
938 // was actually passed on the PIPE.
939 // FIXME: not particularly resilient, but works for now.
940 // FIXME: this will fail if { is found at char 1024.
941 char buf[1024];
942 bool hasFatalImportError = false;
943 while (s.peek() != '{') {
944 if (s.eof()) {
945 return !hasFatalImportError;
946 }
947 if (s.fail() || s.bad()) {
948 throw std::runtime_error("Malformatted input workflow");
949 }
950 s.getline(buf, 1024, '\n');
951 // FairLogger messages (starting with [) simply get forwarded.
952 // Other messages we consider them as ERRORs since they
953 // were printed out without FairLogger.
954 if (buf[0] == '[') {
955 if (strncmp(buf, "[ERROR] invalid workflow in", strlen("[ERROR] invalid workflow in")) == 0 ||
956 strncmp(buf, "[ERROR] error while setting up workflow", strlen("[ERROR] error while setting up workflow")) == 0 ||
957 strncmp(buf, "[ERROR] error parsing options of", strlen("[ERROR] error parsing options of")) == 0) {
958 hasFatalImportError = true;
959 }
960 std::cout << buf << std::endl;
961 } else {
962 LOG(error) << buf;
963 }
964 }
965 if (hasFatalImportError) {
966 return false;
967 }
968 rapidjson::Reader reader;
969 rapidjson::IStreamWrapper isw(s);
970 WorkflowImporter importer{workflow, metadata, command};
971 bool ok = reader.Parse(isw, importer);
972 if (ok == false) {
973 if (s.eof()) {
974 throw std::runtime_error("Error while parsing serialised workflow");
975 }
976 // clean up possible leftovers at the end of the input stream, e.g. [DEBUG] message from destructors
977 O2_SIGNPOST_ID_GENERATE(sid, post_workflow_importer);
978 while (true) {
979 s.getline(buf, 1024, '\n');
980 if (s.eof()) {
981 break;
982 }
983 O2_SIGNPOST_EVENT_EMIT(post_workflow_importer, sid, "post import", "Following leftover line found in input stream after parsing workflow: %{public}s", buf);
984 }
985 }
986 return true;
987}
988
990 std::vector<DataProcessorSpec> const& workflow,
991 std::vector<DataProcessorInfo> const& metadata,
992 CommandInfo const& commandInfo)
993{
994 rapidjson::OStreamWrapper osw(out);
995 rapidjson::PrettyWriter<rapidjson::OStreamWrapper> w(osw);
996
997 // handlers for serialization of InputSpec matchers
998 auto edgeWalker = overloaded{
999 [&w](EdgeActions::EnterNode action) {
1000 w.Key("matcher");
1001 w.StartObject();
1002 w.Key("operation");
1003 std::stringstream ss;
1004 ss << action.node->getOp();
1005 w.String(ss.str().c_str());
1006 if (action.node->getOp() == DataDescriptorMatcher::Op::Just ||
1007 action.node->getOp() == DataDescriptorMatcher::Op::Not) {
1008 return ChildAction::VisitLeft;
1009 }
1010 return ChildAction::VisitBoth;
1011 },
1013 w.Key("left");
1014 w.StartObject();
1015 },
1017 w.EndObject();
1018 },
1020 w.Key("right");
1021 w.StartObject();
1022 },
1024 w.EndObject();
1025 },
1027 w.EndObject();
1028 },
1029 [&w](auto) {}};
1030 auto leafWalker = overloaded{
1031 [&w](OriginValueMatcher const& origin) {
1032 origin.visit(overloaded{
1033 [&w](ContextRef const& ref) {
1034 w.Key("originRef");
1035 w.Uint64(ref.index);
1036 },
1037 [&w](auto const& value) {
1038 w.Key("origin");
1039 std::stringstream ss;
1040 ss << value;
1041 w.String(ss.str().c_str());
1042 }});
1043 },
1045 description.visit(overloaded{
1046 [&w](ContextRef const& ref) {
1047 w.Key("descriptionRef");
1048 w.Uint64(ref.index);
1049 },
1050 [&w](auto const& value) {
1051 w.Key("description");
1052 std::stringstream ss;
1053 ss << value;
1054 w.String(ss.str().c_str());
1055 }});
1056 },
1057 [&w](SubSpecificationTypeValueMatcher const& subspec) {
1058 subspec.visit(overloaded{
1059 [&w](ContextRef const& ref) {
1060 w.Key("subspecRef");
1061 w.Uint64(ref.index);
1062 },
1063 [&w](auto const& value) {
1064 w.Key("subspec");
1065 std::stringstream ss;
1066 ss << value;
1067 w.Uint64(std::stoul(ss.str()));
1068 }});
1069 },
1070 [&w](StartTimeValueMatcher const& startTime) {
1071 w.Key("starttime");
1072 std::stringstream ss;
1073 ss << startTime;
1074 w.String(ss.str().c_str());
1075 },
1076 [&w](ConstantValueMatcher const& constant) {},
1077 [&w](auto t) {}};
1078
1079 w.StartObject();
1080 w.Key("workflow");
1081 w.StartArray();
1082
1083 for (auto& processor : workflow) {
1084 if (processor.name.rfind("internal-dpl", 0) == 0) {
1085 continue;
1086 }
1087 w.StartObject();
1088 w.Key("name");
1089 w.String(processor.name.c_str());
1090
1091 w.Key("inputs");
1092 w.StartArray();
1093 for (auto const& input : processor.inputs) {
1097 w.StartObject();
1098 w.Key("binding");
1099 w.String(input.binding.c_str());
1100 if (auto const* concrete = std::get_if<ConcreteDataMatcher>(&input.matcher)) {
1101 w.Key("origin");
1102 w.String(concrete->origin.str, strnlen(concrete->origin.str, 4));
1103 w.Key("description");
1104 w.String(concrete->description.str, strnlen(concrete->description.str, 16));
1105 w.Key("subspec");
1106 w.Uint64(concrete->subSpec);
1107 // auto tmp = DataSpecUtils::dataDescriptorMatcherFrom(*concrete);
1108 // DataMatcherWalker::walk(tmp,
1109 // edgeWalker,
1110 // leafWalker);
1111 } else if (auto const* matcher = std::get_if<DataDescriptorMatcher>(&input.matcher)) {
1112 DataMatcherWalker::walk(*matcher,
1113 edgeWalker,
1114 leafWalker);
1115 }
1116 w.Key("lifetime");
1117 w.Uint((int)input.lifetime);
1118 if (input.metadata.empty() == false) {
1119 w.Key("metadata");
1120 w.StartArray();
1121 for (auto& metadata : input.metadata) {
1122 w.StartObject();
1123 w.Key("name");
1124 w.String(metadata.name.c_str());
1125 auto s = std::to_string(int(metadata.type));
1126 w.Key("type");
1127 w.String(s.c_str());
1128 std::ostringstream oss;
1129 oss << metadata.defaultValue;
1130 w.Key("defaultValue");
1131 w.String(oss.str().c_str());
1132 w.Key("help");
1133 w.String(metadata.help.c_str());
1134 w.EndObject();
1135 }
1136 w.EndArray();
1137 }
1138 w.EndObject();
1139 }
1140 w.EndArray();
1141
1142 w.Key("outputs");
1143 w.StartArray();
1144 for (auto& output : processor.outputs) {
1145 w.StartObject();
1146 w.Key("binding");
1147 if (output.binding.value.empty()) {
1148 auto autogenerated = DataSpecUtils::describe(output);
1149 w.String(autogenerated.c_str());
1150 } else {
1151 w.String(output.binding.value.c_str());
1152 }
1154 w.Key("origin");
1155 w.String(dataType.origin.str, strnlen(dataType.origin.str, 4));
1156 w.Key("description");
1157 w.String(dataType.description.str, strnlen(dataType.description.str, 16));
1158 // FIXME: this will have to change once we introduce wildcards for
1159 // OutputSpec
1161 if (subSpec.has_value()) {
1162 w.Key("subspec");
1163 w.Uint64(*subSpec);
1164 }
1165 w.Key("lifetime");
1166 w.Uint((int)output.lifetime);
1167 if (output.metadata.empty() == false) {
1168 w.Key("metadata");
1169 w.StartArray();
1170 for (auto& metadata : output.metadata) {
1171 w.StartObject();
1172 w.Key("name");
1173 w.String(metadata.name.c_str());
1174 auto s = std::to_string(int(metadata.type));
1175 w.Key("type");
1176 w.String(s.c_str());
1177 std::ostringstream oss;
1178 oss << metadata.defaultValue;
1179 w.Key("defaultValue");
1180 w.String(oss.str().c_str());
1181 w.Key("help");
1182 w.String(metadata.help.c_str());
1183 w.EndObject();
1184 }
1185 w.EndArray();
1186 }
1187 w.EndObject();
1188 }
1189 w.EndArray();
1190
1191 w.Key("options");
1192 w.StartArray();
1193 for (auto& option : processor.options) {
1194 if (option.name == "start-value-enumeration" || option.name == "end-value-enumeration" || option.name == "step-value-enumeration" || option.name == "orbit-offset-enumeration" || option.name == "orbit-multiplier-enumeration") {
1195 continue;
1196 }
1197 w.StartObject();
1198 w.Key("name");
1199 w.String(option.name.c_str());
1200 auto s = std::to_string(int(option.type));
1201 w.Key("type");
1202 w.String(s.c_str());
1203 std::ostringstream oss;
1204 switch (option.type) {
1217 case VariantType::Dict:
1218 VariantJSONHelpers::write(oss, option.defaultValue);
1219 break;
1220 default:
1221 oss << option.defaultValue;
1222 break;
1223 }
1224 w.Key("defaultValue");
1225 w.String(oss.str().c_str());
1226 w.Key("help");
1227 w.String(option.help.c_str());
1228 w.Key("kind");
1229 w.String(std::to_string((int)option.kind).c_str());
1230 w.EndObject();
1231 }
1232 w.EndArray();
1233 w.Key("labels");
1234 w.StartArray();
1235 for (auto& label : processor.labels) {
1236 w.String(label.value.c_str());
1237 }
1238 w.EndArray();
1239 w.Key("metadata");
1240 w.StartArray();
1241 for (auto& metadatum : processor.metadata) {
1242 w.StartObject();
1243 w.Key("key");
1244 w.String(metadatum.key.c_str());
1245 w.Key("value");
1246 w.String(metadatum.value.c_str());
1247 w.EndObject();
1248 }
1249 w.EndArray();
1250 w.Key("rank");
1251 w.Int(processor.rank);
1252 w.Key("nSlots");
1253 w.Int(processor.nSlots);
1254 w.Key("inputTimeSliceId");
1255 w.Int(processor.inputTimeSliceId);
1256 w.Key("maxInputTimeslices");
1257 w.Int(processor.maxInputTimeslices);
1258
1259 w.EndObject();
1260 }
1261 w.EndArray();
1262
1263 w.Key("metadata");
1264 w.StartArray();
1265 for (auto& info : metadata) {
1266 w.StartObject();
1267 w.Key("name");
1268 w.String(info.name.c_str());
1269 w.Key("executable");
1270 w.String(info.executable.c_str());
1271 w.Key("cmdLineArgs");
1272 w.StartArray();
1273 for (auto& arg : info.cmdLineArgs) {
1274 w.String(arg.c_str());
1275 }
1276 w.EndArray();
1277 w.Key("workflowOptions");
1278 w.StartArray();
1279 for (auto& option : info.workflowOptions) {
1280 w.StartObject();
1281 w.Key("name");
1282 w.String(option.name.c_str());
1283 auto s = std::to_string(int(option.type));
1284 w.Key("type");
1285 w.String(s.c_str());
1286 std::ostringstream oss;
1287 oss << option.defaultValue;
1288 w.Key("defaultValue");
1289 w.String(oss.str().c_str());
1290 w.Key("help");
1291 w.String(option.help.c_str());
1292 w.EndObject();
1293 }
1294 w.EndArray();
1295 w.Key("channels");
1296 w.StartArray();
1297 for (auto& channel : info.channels) {
1298 w.String(channel.c_str());
1299 }
1300 w.EndArray();
1301 w.EndObject();
1302 }
1303 w.EndArray();
1304
1305 w.Key("command");
1306 w.String(commandInfo.command.c_str());
1307
1308 w.EndObject();
1309}
1310
1311} // namespace o2::framework
header::DataOrigin origin
header::DataDescription description
benchmark::State & state
std::unique_ptr< expressions::Node > node
int32_t i
uint32_t op
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
Something which can be matched against a header::DataDescription.
Something which can be matched against a header::DataOrigin.
Matcher on actual time, as reported in the DataProcessingHeader.
Something which can be matched against a header::SubSpecificationType.
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
GLuint * states
Definition glcorearb.h:4932
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
Defining PrimaryVertex explicitly as messageable.
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
Variant emptyDict()
Definition Variant.h:428
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void merge(CommandInfo const &other)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static std::optional< framework::ConcreteDataMatcher > optionalConcreteDataMatcherFrom(data_matcher::DataDescriptorMatcher const &matcher)
static void write(std::ostream &o, Variant const &v)
std::vector< DataProcessorInfo > & metadata
friend std::ostream & operator<<(std::ostream &s, State state)
std::vector< ConfigParamSpec > outputOptions
bool Key(const Ch *str, SizeType length, bool copy)
std::vector< DataProcessorSpec > & dataProcessors
std::vector< ConfigParamSpec > inputOptions
bool String(const Ch *str, SizeType length, bool copy)
WorkflowImporter(std::vector< DataProcessorSpec > &o, std::vector< DataProcessorInfo > &m, CommandInfo &c)
std::vector< data_matcher::Node > inputMatcherNodes
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)
static bool import(std::istream &s, std::vector< DataProcessorSpec > &workflow, std::vector< DataProcessorInfo > &metadata, CommandInfo &command)
A typesafe reference to an element of the context.
static void walk(DataDescriptorMatcher const &top, EDGEWALKER edgeWalker, LEAFWALKER leafWalker)
From https://en.cppreference.com/w/cpp/utility/variant/visit.
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str