Project
Loading...
Searching...
No Matches
WorkflowSerializationHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
20#include "Framework/Logger.h"
21#include "Framework/Signpost.h"
22
23#include <rapidjson/reader.h>
24#include <rapidjson/prettywriter.h>
25#include <rapidjson/istreamwrapper.h>
26#include <rapidjson/ostreamwrapper.h>
27#include <iostream>
28#include <algorithm>
29#include <memory>
30
31O2_DECLARE_DYNAMIC_LOG(workflow_importer);
32
33namespace o2::framework
34{
35
36using namespace rapidjson;
37using namespace o2::framework::data_matcher;
38
39struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, WorkflowImporter> {
100
101 friend std::ostream& operator<<(std::ostream& s, State state)
102 {
103 switch (state) {
104 case State::IN_START:
105 s << "IN_START";
106 break;
108 s << "IN_EXECUTION";
109 break;
111 s << "IN_WORKFLOW";
112 break;
114 s << "IN_COMMAND";
115 break;
117 s << "IN_DATAPROCESSORS";
118 break;
120 s << "IN_DATAPROCESSOR";
121 break;
123 s << "IN_DATAPROCESSOR_NAME";
124 break;
126 s << "IN_DATAPROCESSOR_RANK";
127 break;
129 s << "IN_DATAPROCESSOR_N_SLOTS";
130 break;
132 s << "IN_DATAPROCESSOR_TIMESLICE_ID";
133 break;
135 s << "IN_DATAPROCESSOR_MAX_TIMESLICES";
136 break;
137 case State::IN_INPUTS:
138 s << "IN_INPUTS";
139 break;
141 s << "IN_OUTPUTS";
142 break;
144 s << "IN_OPTIONS";
145 break;
146 case State::IN_LABELS:
147 s << "IN_LABELS";
148 break;
150 s << "IN_METADATA";
151 break;
153 s << "IN_WORKFLOW_OPTIONS";
154 break;
155 case State::IN_INPUT:
156 s << "IN_INPUT";
157 break;
159 s << "IN_INPUT_BINDING";
160 break;
162 s << "IN_INPUT_ORIGIN";
163 break;
165 s << "IN_INPUT_DESCRIPTION";
166 break;
168 s << "IN_INPUT_SUBSPEC";
169 break;
171 s << "IN_INPUT_ORIGIN_REF";
172 break;
174 s << "IN_INPUT_DESCRIPTION_REF";
175 break;
177 s << "IN_INPUT_SUBSPEC_REF";
178 break;
180 s << "IN_INPUT_MATCHER";
181 break;
183 s << "IN_INPUT_MATCHER_OPERATION";
184 break;
186 s << "IN_INPUT_LEFT_MATCHER";
187 break;
189 s << "IN_INPUT_RIGHT_MATCHER";
190 break;
192 s << "IN_INPUT_LIFETIME";
193 break;
195 s << "IN_INPUT_STARTTIME";
196 break;
198 s << "IN_INPUT_OPTIONS";
199 break;
200 case State::IN_OUTPUT:
201 s << "IN_OUTPUT";
202 break;
204 s << "IN_OUTPUT_BINDING";
205 break;
207 s << "IN_OUTPUT_ORIGIN";
208 break;
210 s << "IN_OUTPUT_DESCRIPTION";
211 break;
213 s << "IN_OUTPUT_SUBSPEC";
214 break;
216 s << "IN_OUTPUT_LIFETIME";
217 break;
219 s << "IN_OUTPUT_OPTIONS";
220 break;
221 case State::IN_OPTION:
222 s << "IN_OPTION";
223 break;
225 s << "IN_OPTION_NAME";
226 break;
228 s << "IN_OPTION_TYPE";
229 break;
231 s << "IN_OPTION_DEFAULT";
232 break;
234 s << "IN_OPTION_HELP";
235 break;
237 s << "IN_OPTION_KIND";
238 break;
239 case State::IN_LABEL:
240 s << "IN_LABEL";
241 break;
243 s << "IN_METADATUM";
244 break;
246 s << "IN_METADATUM_KEY";
247 break;
249 s << "IN_METADATUM_VALUE";
250 break;
251 case State::IN_ERROR:
252 s << "IN_ERROR";
253 break;
255 s << "IN_DATAPROCESSOR_INFOS";
256 break;
258 s << "IN_DATAPROCESSOR_INFO";
259 break;
261 s << "IN_DATAPROCESSOR_INFO_NAME";
262 break;
264 s << "IN_DATAPROCESSOR_INFO_EXECUTABLE";
265 break;
267 s << "IN_DATAPROCESSOR_INFO_ARGS";
268 break;
270 s << "IN_DATAPROCESSOR_INFO_ARG";
271 break;
273 s << "IN_DATAPROCESSOR_INFO_CHANNELS";
274 break;
276 s << "IN_DATAPROCESSOR_INFO_CHANNEL";
277 break;
278 }
279 return s;
280 }
281
282 WorkflowImporter(std::vector<DataProcessorSpec>& o,
283 std::vector<DataProcessorInfo>& m,
284 CommandInfo& c)
285 : states{},
287 metadata{m},
288 command{c}
289 {
291 }
292
294 {
295 enter("START_OBJECT");
296 if (in(State::IN_START)) {
298 } else if (in(State::IN_DATAPROCESSORS)) {
301 } else if (in(State::IN_DATAPROCESSOR)) {
303 } else if (in(State::IN_INPUTS)) {
305 inputMatcherNodes.clear();
306 } else if (in(State::IN_INPUT_MATCHER)) {
307 // start a new embedded matcher
308 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
309 // this is a matcher leaf, i.e. last matcher of a branch
310 // will be merged into the parent matcher
311 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
312 // this is a matcher leaf, i.e. last matcher of a branch
313 // will be merged into the parent matcher
314 } else if (in(State::IN_OUTPUTS)) {
316 outputHasSubSpec = false;
317 } else if (in(State::IN_OPTIONS)) {
319 } else if (in(State::IN_INPUT_OPTIONS)) {
321 } else if (in(State::IN_OUTPUT_OPTIONS)) {
323 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
325 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
327 metadata.push_back(DataProcessorInfo{});
328 } else if (in(State::IN_DATAPROCESSOR_INFO)) {
329 metadata.push_back(DataProcessorInfo{});
330 } else if (in(State::IN_METADATA)) {
332 metadatumKey.clear();
333 metadatumValue.clear();
334 } else if (in(State::IN_COMMAND)) {
336 }
337 return true;
338 }
339
340 bool EndObject(SizeType memberCount)
341 {
342 enter("END_OBJECT");
343 if (in(State::IN_INPUT)) {
344 auto buildMatcher = [](auto& nodes) -> std::unique_ptr<DataDescriptorMatcher> {
345 auto lastMatcher =
346 std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::Just,
347 StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
348 for (size_t ni = 0, ne = nodes.size(); ni < ne; ++ni) {
349 auto& node = nodes[nodes.size() - 1 - ni];
350 auto tmp = std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And,
351 std::move(node),
352 std::move(lastMatcher));
353 assert(lastMatcher.get() == nullptr);
354 lastMatcher = std::move(tmp);
355 }
356 return lastMatcher;
357 };
358
359 std::unique_ptr<DataDescriptorMatcher> matcher;
360 if (auto* pval = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[0])) {
361 assert(inputMatcherNodes.size() == 1);
362 matcher = std::move(*pval);
363 } else {
364 matcher = buildMatcher(inputMatcherNodes);
365 }
366 auto concrete = DataSpecUtils::optionalConcreteDataMatcherFrom(*matcher);
367 if (concrete.has_value()) {
368 // the matcher is fully qualified with unique parameters so we add ConcreteDataMatcher
369 dataProcessors.back().inputs.push_back(InputSpec({binding}, (*concrete).origin, (*concrete).description, (*concrete).subSpec, lifetime, inputOptions));
370 } else {
371 dataProcessors.back().inputs.push_back(InputSpec({binding}, std::move(*matcher), lifetime, inputOptions));
372 }
373 inputMatcherNodes.clear();
374 inputOptions.clear();
375
376 } else if (in(State::IN_INPUT_MATCHER) && inputMatcherNodes.size() > 1) {
377 data_matcher::Node child = std::move(inputMatcherNodes.back());
378 inputMatcherNodes.pop_back();
379 auto* matcher = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&child);
380 assert(matcher != nullptr);
381 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
382 assert(parent != nullptr);
383 std::unique_ptr<DataDescriptorMatcher> node;
384 auto mergeDown = [&node, &parent, &child]() -> bool {
385 // FIXME: do we need a dedicated default state, or can we simply use ConstantValueMatcher
386 if (auto* pval1 = std::get_if<ConstantValueMatcher>(&((*parent)->getLeft()))) {
387 if (*pval1 == ConstantValueMatcher{false}) {
388 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
389 std::move(child),
390 std::move((*parent)->getRight()));
391 return true;
392 }
393 }
394 if (auto* pval2 = std::get_if<ConstantValueMatcher>(&((*parent)->getRight()))) {
395 if (*pval2 == ConstantValueMatcher{false}) {
396 node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
397 std::move((*parent)->getLeft()),
398 std::move(child));
399 return true;
400 }
401 }
402 return false;
403 };
404 if (!mergeDown()) {
405 states.push_back(State::IN_ERROR);
406 }
407 inputMatcherNodes.pop_back();
408 inputMatcherNodes.push_back(std::move(node));
409 } else if (in(State::IN_INPUT_LEFT_MATCHER)) {
410 assert(inputMatcherNodes.size() >= 2);
411 size_t nMatchers = inputMatcherNodes.size();
412 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes[nMatchers - 2]);
413 assert(parent != nullptr);
414 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
415 std::move(inputMatcherNodes[nMatchers - 1]),
416 std::move((*parent)->getRight()));
417 inputMatcherNodes.pop_back();
418 inputMatcherNodes.pop_back();
419 inputMatcherNodes.push_back(std::move(node));
420 } else if (in(State::IN_INPUT_RIGHT_MATCHER)) {
421 data_matcher::Node child = std::move(inputMatcherNodes.back());
422 inputMatcherNodes.pop_back();
423 auto* parent = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&inputMatcherNodes.back());
424 assert(parent != nullptr);
425 auto node = std::make_unique<DataDescriptorMatcher>((*parent)->getOp(),
426 std::move((*parent)->getLeft()),
427 std::move(child));
428 inputMatcherNodes.pop_back();
429 inputMatcherNodes.push_back(std::move(node));
430 } else if (in(State::IN_OUTPUT)) {
431 if (outputHasSubSpec) {
432 dataProcessors.back().outputs.push_back(OutputSpec({binding}, origin, description, subspec, lifetime));
433 } else {
434 dataProcessors.back().outputs.push_back(OutputSpec({binding}, {origin, description}, lifetime));
435 }
436 outputHasSubSpec = false;
437 } else if (in(State::IN_OPTION)) {
438 std::unique_ptr<ConfigParamSpec> opt{nullptr};
439
440 using HelpString = ConfigParamSpec::HelpString;
441 std::stringstream is;
442 is.str(optionDefault);
443 switch (optionType) {
445 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault.c_str(), HelpString{optionHelp}, optionKind);
446 break;
447 case VariantType::Int:
448 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
449 break;
451 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
452 break;
454 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<int16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
455 break;
457 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint8_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
458 break;
460 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint16_t>(std::stoi(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
461 break;
463 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, static_cast<uint32_t>(std::stoul(optionDefault, nullptr)), HelpString{optionHelp}, optionKind);
464 break;
466 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stoul(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
467 break;
469 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stol(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
470 break;
472 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stof(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
473 break;
475 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, std::stod(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
476 break;
478 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, (bool)std::stoi(optionDefault, nullptr), HelpString{optionHelp}, optionKind);
479 break;
481 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayInt>(is), HelpString{optionHelp}, optionKind);
482 break;
484 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayFloat>(is), HelpString{optionHelp}, optionKind);
485 break;
487 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayDouble>(is), HelpString{optionHelp}, optionKind);
488 break;
489 // case VariantType::ArrayBool:
490 // opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayBool>(is), HelpString{optionHelp}, optionKind);
491 // break;
493 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::ArrayString>(is), HelpString{optionHelp}, optionKind);
494 break;
496 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DInt>(is), HelpString{optionHelp}, optionKind);
497 break;
499 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DFloat>(is), HelpString{optionHelp}, optionKind);
500 break;
502 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::Array2DDouble>(is), HelpString{optionHelp}, optionKind);
503 break;
505 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayInt>(is), HelpString{optionHelp}, optionKind);
506 break;
508 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayFloat>(is), HelpString{optionHelp}, optionKind);
509 break;
511 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayDouble>(is), HelpString{optionHelp}, optionKind);
512 break;
514 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, VariantJSONHelpers::read<VariantType::LabeledArrayString>(is), HelpString{optionHelp}, optionKind);
515 break;
517 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, emptyDict(), HelpString{optionHelp}, optionKind);
518 break;
519 default:
520 opt = std::make_unique<ConfigParamSpec>(optionName, optionType, optionDefault, HelpString{optionHelp}, optionKind);
521 }
522 // Depending on the previous state, push options to the right place.
524 dataProcessors.back().options.push_back(*opt);
526 metadata.back().workflowOptions.push_back(*opt);
528 inputOptions.push_back(*opt);
530 outputOptions.push_back(*opt);
531 } else {
532 assert(false);
533 }
534 } else if (in(State::IN_METADATUM)) {
535 dataProcessors.back().metadata.push_back({metadatumKey, metadatumValue});
536 }
537 pop();
538 return true;
539 }
540
542 {
543 enter("START_ARRAY");
544 if (in(State::IN_WORKFLOW)) {
546 } else if (in(State::IN_INPUTS)) {
548 } else if (in(State::IN_INPUT_OPTIONS)) {
550 } else if (in(State::IN_OUTPUT_OPTIONS)) {
552 } else if (in(State::IN_OUTPUTS)) {
554 outputHasSubSpec = false;
555 } else if (in(State::IN_OPTIONS)) {
557 } else if (in(State::IN_WORKFLOW_OPTIONS)) {
559 } else if (in(State::IN_LABELS)) {
561 } else if (in(State::IN_METADATA)) {
563 } else if (in(State::IN_DATAPROCESSOR_INFOS)) {
569 }
570 return true;
571 }
572
573 bool EndArray(SizeType count)
574 {
575 enter("END_ARRAY");
576 // Handle the case in which inputs / options / outputs are
577 // empty.
581 pop();
582 }
583 pop();
584 return true;
585 }
586
587 bool Key(const Ch* str, SizeType length, bool copy)
588 {
589 enter("KEY");
590 enter(str);
591 if (in(State::IN_INPUT) && strncmp(str, "binding", length) == 0) {
593 } else if (in(State::IN_INPUT) && strncmp(str, "origin", length) == 0) {
595 } else if (in(State::IN_INPUT) && strncmp(str, "description", length) == 0) {
597 } else if (in(State::IN_INPUT) && strncmp(str, "subspec", length) == 0) {
599 } else if (in(State::IN_INPUT) && strncmp(str, "originRef", length) == 0) {
601 } else if (in(State::IN_INPUT) && strncmp(str, "descriptionRef", length) == 0) {
603 } else if (in(State::IN_INPUT) && strncmp(str, "subspecRef", length) == 0) {
605 } else if (in(State::IN_INPUT) && strncmp(str, "matcher", length) == 0) {
606 // the outermost matcher is starting here
607 // we create a placeholder which is being updated later
608 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
610 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "matcher", length) == 0) {
611 // recursive matchers
612 inputMatcherNodes.push_back(std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::And, ConstantValueMatcher{false}));
614 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "operation", length) == 0) {
616 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "left", length) == 0) {
618 } else if (in(State::IN_INPUT_MATCHER) && strncmp(str, "right", length) == 0) {
620 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "origin", length) == 0) {
622 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "origin", length) == 0) {
624 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "description", length) == 0) {
626 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "description", length) == 0) {
628 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspec", length) == 0) {
630 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspec", length) == 0) {
632 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "originRef", length) == 0) {
634 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "originRef", length) == 0) {
636 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
638 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "descriptionRef", length) == 0) {
640 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
642 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "subspecRef", length) == 0) {
644 } else if (in(State::IN_INPUT_LEFT_MATCHER) && strncmp(str, "starttime", length) == 0) {
646 } else if (in(State::IN_INPUT_RIGHT_MATCHER) && strncmp(str, "starttime", length) == 0) {
648 } else if (in(State::IN_INPUT) && strncmp(str, "lifetime", length) == 0) {
650 } else if (in(State::IN_INPUT) && strncmp(str, "starttime", length) == 0) {
652 } else if (in(State::IN_INPUT) && strncmp(str, "metadata", length) == 0) {
654 } else if (in(State::IN_OUTPUT) && strncmp(str, "binding", length) == 0) {
656 } else if (in(State::IN_OUTPUT) && strncmp(str, "origin", length) == 0) {
658 } else if (in(State::IN_OUTPUT) && strncmp(str, "description", length) == 0) {
660 } else if (in(State::IN_OUTPUT) && strncmp(str, "subspec", length) == 0) {
662 outputHasSubSpec = true;
663 } else if (in(State::IN_OUTPUT) && strncmp(str, "lifetime", length) == 0) {
665 } else if (in(State::IN_OUTPUT) && strncmp(str, "metadata", length) == 0) {
667 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "name", length) == 0) {
669 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "ranks", length) == 0) {
671 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "nSlots", length) == 0) {
673 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputTimeSliceId", length) == 0) {
675 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "maxInputTimeslices", length) == 0) {
677 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "inputs", length) == 0) {
679 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "outputs", length) == 0) {
681 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "options", length) == 0) {
683 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "labels", length) == 0) {
685 } else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "metadata", length) == 0) {
687 } else if (in(State::IN_METADATUM) && strncmp(str, "key", length) == 0) {
689 } else if (in(State::IN_METADATUM) && strncmp(str, "value", length) == 0) {
691 } else if (in(State::IN_EXECUTION) && strncmp(str, "workflow", length) == 0) {
693 } else if (in(State::IN_EXECUTION) && strncmp(str, "metadata", length) == 0) {
695 } else if (in(State::IN_OPTION) && strncmp(str, "name", length) == 0) {
697 } else if (in(State::IN_OPTION) && strncmp(str, "type", length) == 0) {
699 } else if (in(State::IN_OPTION) && strncmp(str, "defaultValue", length) == 0) {
701 } else if (in(State::IN_OPTION) && strncmp(str, "help", length) == 0) {
703 } else if (in(State::IN_OPTION) && strncmp(str, "kind", length) == 0) {
705 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "name", length) == 0) {
707 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) {
709 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) {
711 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) {
713 } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "channels", length) == 0) {
715 } else if (in(State::IN_EXECUTION) && strncmp(str, "command", length) == 0) {
717 }
718 return true;
719 }
720
721 bool String(const Ch* str, SizeType length, bool copy)
722 {
723 enter("STRING");
724 enter(str);
725 auto s = std::string(str, length);
727 assert(dataProcessors.size());
728 dataProcessors.back().name = s;
730 assert(metadata.size());
731 metadata.back().name = s;
733 assert(metadata.size());
734 metadata.back().executable = s;
735 } else if (in(State::IN_INPUT_BINDING)) {
736 binding = s;
737 } else if (in(State::IN_INPUT_ORIGIN)) {
738 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
739 std::string v(s.c_str(), std::min(s.size(), 4UL));
741 } else if (in(State::IN_INPUT_DESCRIPTION)) {
742 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
743 std::string v(s.c_str(), std::min(s.size(), 16UL));
745 } else if (in(State::IN_INPUT_STARTTIME)) {
746 // we add StartTimeValueMatcher with ContextRef for starttime, no matter what
747 // has been in the configuration.
748 inputMatcherNodes.push_back(StartTimeValueMatcher(ContextRef{ContextPos::STARTTIME_POS}));
750 // FIXME: need to implement operator>> to read the op parameter
751 DataDescriptorMatcher::Op op = DataDescriptorMatcher::Op::And;
752 if (s == "and") {
753 op = DataDescriptorMatcher::Op::And;
754 } else if (s == "or") {
755 op = DataDescriptorMatcher::Op::Or;
756 } else if (s == "xor") {
757 op = DataDescriptorMatcher::Op::Xor;
758 } else if (s == "just") {
759 op = DataDescriptorMatcher::Op::Just;
760 } else if (s == "not") {
761 op = DataDescriptorMatcher::Op::Not;
762 }
763 // FIXME: we could drop the placeholder which has been added when entering
764 // the states which can read key 'operation', but then we need to make sure
765 // that this key is always present
766 auto node = std::make_unique<DataDescriptorMatcher>(op, ConstantValueMatcher{false});
767 inputMatcherNodes.pop_back();
768 inputMatcherNodes.push_back(std::move(node));
769 } else if (in(State::IN_OUTPUT_BINDING)) {
770 binding = s;
771 } else if (in(State::IN_OUTPUT_ORIGIN)) {
772 origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
773 } else if (in(State::IN_OUTPUT_DESCRIPTION)) {
774 description.runtimeInit(s.c_str(), std::min(s.size(), 16UL));
775 } else if (in(State::IN_OPTION_NAME)) {
776 optionName = s;
777 } else if (in(State::IN_OPTION_TYPE)) {
778 optionType = (VariantType)std::stoi(s, nullptr);
779 } else if (in(State::IN_OPTION_KIND)) {
780 optionKind = (ConfigParamKind)std::stoi(s, nullptr);
781 } else if (in(State::IN_OPTION_DEFAULT)) {
782 optionDefault = s;
783 } else if (in(State::IN_OPTION_HELP)) {
784 optionHelp = s;
785 } else if (in(State::IN_LABEL)) {
786 dataProcessors.back().labels.push_back({s});
787 // This is in an array, so we do not actually want to
788 // exit from the state.
790 } else if (in(State::IN_METADATUM_KEY)) {
791 metadatumKey = s;
792 } else if (in(State::IN_METADATUM_VALUE)) {
793 metadatumValue = s;
795 metadata.back().cmdLineArgs.push_back(s);
796 // This is in an array, so we do not actually want to
797 // exit from the state.
800 metadata.back().channels.push_back(s);
801 // This is in an array, so we do not actually want to
802 // exit from the state.
804 } else if (in(State::IN_COMMAND)) {
805 command.merge({s});
806 } else {
807 std::stringstream errstr;
808 errstr << "No string handling for argument '" << std::string(str, length) << "' in state " << states.back() << std::endl;
809 throw std::runtime_error(errstr.str());
810 }
811 pop();
812 return true;
813 }
814
815 bool Uint(unsigned i)
816 {
817 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint(%d)", i);
819 subspec = i;
821 } else if (in(State::IN_INPUT_ORIGIN_REF)) {
822 ref = i;
825 ref = i;
827 } else if (in(State::IN_INPUT_SUBSPEC_REF)) {
828 ref = i;
830 } else if (in(State::IN_OUTPUT_SUBSPEC)) {
831 subspec = i;
832 } else if (in(State::IN_INPUT_LIFETIME)) {
834 } else if (in(State::IN_OUTPUT_LIFETIME)) {
836 } else if (in(State::IN_DATAPROCESSOR_RANK)) {
837 dataProcessors.back().rank = i;
839 dataProcessors.back().nSlots = i;
841 dataProcessors.back().inputTimeSliceId = i;
843 dataProcessors.back().maxInputTimeslices = i;
844 }
845 pop();
846 return true;
847 }
848
849 bool Int(int i)
850 {
851 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Int(%d)", i);
852 return true;
853 }
854 bool Uint64(uint64_t u)
855 {
856 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Uint64(%" PRIu64 ")", u);
857 return true;
858 }
859 bool Double(double d)
860 {
861 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "Double(%f)", d);
862 return true;
863 }
864
865 void enter(char const* what)
866 {
867 O2_SIGNPOST_EVENT_EMIT(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "ENTER: %s", what);
868 }
869
870 void push(State state)
871 {
872 debug.str("");
873 debug << state;
874 states.push_back(state);
875 O2_SIGNPOST_START(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()}, "import", "PUSH: %s", debug.str().c_str());
876 }
877
879 {
880 if (states.empty()) {
881 states.push_back(State::IN_ERROR);
882 return State::IN_ERROR;
883 }
884 auto result = states.back();
885 states.pop_back();
886 debug.str("");
887 debug << result;
888 if (!states.empty()) {
889 debug << " now in " << states.back();
890 }
891 O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str());
892 return result;
893 }
894 bool in(State o)
895 {
896 return states.back() == o;
897 }
898
900 {
901 assert(states.size() > 1);
902 return states[states.size() - 2] == o;
903 }
904
905 std::ostringstream debug;
906 std::vector<State> states;
907 std::string spec;
908 std::vector<DataProcessorSpec>& dataProcessors;
909 std::vector<DataProcessorInfo>& metadata;
911 std::vector<ConfigParamSpec> inputOptions;
912 std::vector<ConfigParamSpec> outputOptions;
913 std::string binding;
916 size_t subspec;
917 size_t ref;
919 std::string metadatumKey;
920 std::string metadatumValue;
921 std::string optionName;
923 std::string optionDefault;
924 std::string optionHelp;
927 std::vector<data_matcher::Node> inputMatcherNodes;
928};
929
931 std::vector<DataProcessorSpec>& workflow,
932 std::vector<DataProcessorInfo>& metadata,
933 CommandInfo& command)
934{
935 // Skip any line which does not start with '{'
936 // If we do not find a starting {, we simply assume that no workflow
937 // was actually passed on the PIPE.
938 // FIXME: not particularly resilient, but works for now.
939 // FIXME: this will fail if { is found at char 1024.
940 char buf[1024];
941 bool hasFatalImportError = false;
942 while (s.peek() != '{') {
943 if (s.eof()) {
944 return !hasFatalImportError;
945 }
946 if (s.fail() || s.bad()) {
947 throw std::runtime_error("Malformatted input workflow");
948 }
949 s.getline(buf, 1024, '\n');
950 // FairLogger messages (starting with [) simply get forwarded.
951 // Other messages we consider them as ERRORs since they
952 // were printed out without FairLogger.
953 if (buf[0] == '[') {
954 if (strncmp(buf, "[ERROR] invalid workflow in", strlen("[ERROR] invalid workflow in")) == 0 ||
955 strncmp(buf, "[ERROR] error while setting up workflow", strlen("[ERROR] error while setting up workflow")) == 0 ||
956 strncmp(buf, "[ERROR] error parsing options of", strlen("[ERROR] error parsing options of")) == 0) {
957 hasFatalImportError = true;
958 }
959 std::cout << buf << std::endl;
960 } else {
961 LOG(error) << buf;
962 }
963 }
964 if (hasFatalImportError) {
965 return false;
966 }
967 rapidjson::Reader reader;
968 rapidjson::IStreamWrapper isw(s);
969 WorkflowImporter importer{workflow, metadata, command};
970 bool ok = reader.Parse(isw, importer);
971 if (ok == false) {
972 throw std::runtime_error("Error while parsing serialised workflow");
973 }
974 return true;
975}
976
978 std::vector<DataProcessorSpec> const& workflow,
979 std::vector<DataProcessorInfo> const& metadata,
980 CommandInfo const& commandInfo)
981{
982 rapidjson::OStreamWrapper osw(out);
983 rapidjson::PrettyWriter<rapidjson::OStreamWrapper> w(osw);
984
985 // handlers for serialization of InputSpec matchers
986 auto edgeWalker = overloaded{
987 [&w](EdgeActions::EnterNode action) {
988 w.Key("matcher");
989 w.StartObject();
990 w.Key("operation");
991 std::stringstream ss;
992 ss << action.node->getOp();
993 w.String(ss.str().c_str());
994 if (action.node->getOp() == DataDescriptorMatcher::Op::Just ||
995 action.node->getOp() == DataDescriptorMatcher::Op::Not) {
996 return ChildAction::VisitLeft;
997 }
998 return ChildAction::VisitBoth;
999 },
1001 w.Key("left");
1002 w.StartObject();
1003 },
1005 w.EndObject();
1006 },
1008 w.Key("right");
1009 w.StartObject();
1010 },
1012 w.EndObject();
1013 },
1015 w.EndObject();
1016 },
1017 [&w](auto) {}};
1018 auto leafWalker = overloaded{
1019 [&w](OriginValueMatcher const& origin) {
1020 origin.visit(overloaded{
1021 [&w](ContextRef const& ref) {
1022 w.Key("originRef");
1023 w.Uint64(ref.index);
1024 },
1025 [&w](auto const& value) {
1026 w.Key("origin");
1027 std::stringstream ss;
1028 ss << value;
1029 w.String(ss.str().c_str());
1030 }});
1031 },
1032 [&w](DescriptionValueMatcher const& description) {
1033 description.visit(overloaded{
1034 [&w](ContextRef const& ref) {
1035 w.Key("descriptionRef");
1036 w.Uint64(ref.index);
1037 },
1038 [&w](auto const& value) {
1039 w.Key("description");
1040 std::stringstream ss;
1041 ss << value;
1042 w.String(ss.str().c_str());
1043 }});
1044 },
1045 [&w](SubSpecificationTypeValueMatcher const& subspec) {
1046 subspec.visit(overloaded{
1047 [&w](ContextRef const& ref) {
1048 w.Key("subspecRef");
1049 w.Uint64(ref.index);
1050 },
1051 [&w](auto const& value) {
1052 w.Key("subspec");
1053 std::stringstream ss;
1054 ss << value;
1055 w.Uint64(std::stoul(ss.str()));
1056 }});
1057 },
1058 [&w](StartTimeValueMatcher const& startTime) {
1059 w.Key("starttime");
1060 std::stringstream ss;
1061 ss << startTime;
1062 w.String(ss.str().c_str());
1063 },
1064 [&w](ConstantValueMatcher const& constant) {},
1065 [&w](auto t) {}};
1066
1067 w.StartObject();
1068 w.Key("workflow");
1069 w.StartArray();
1070
1071 for (auto& processor : workflow) {
1072 if (processor.name.rfind("internal-dpl", 0) == 0) {
1073 continue;
1074 }
1075 w.StartObject();
1076 w.Key("name");
1077 w.String(processor.name.c_str());
1078
1079 w.Key("inputs");
1080 w.StartArray();
1081 for (auto const& input : processor.inputs) {
1085 w.StartObject();
1086 w.Key("binding");
1087 w.String(input.binding.c_str());
1088 if (auto const* concrete = std::get_if<ConcreteDataMatcher>(&input.matcher)) {
1089 w.Key("origin");
1090 w.String(concrete->origin.str, strnlen(concrete->origin.str, 4));
1091 w.Key("description");
1092 w.String(concrete->description.str, strnlen(concrete->description.str, 16));
1093 w.Key("subspec");
1094 w.Uint64(concrete->subSpec);
1095 // auto tmp = DataSpecUtils::dataDescriptorMatcherFrom(*concrete);
1096 // DataMatcherWalker::walk(tmp,
1097 // edgeWalker,
1098 // leafWalker);
1099 } else if (auto const* matcher = std::get_if<DataDescriptorMatcher>(&input.matcher)) {
1100 DataMatcherWalker::walk(*matcher,
1101 edgeWalker,
1102 leafWalker);
1103 }
1104 w.Key("lifetime");
1105 w.Uint((int)input.lifetime);
1106 if (input.metadata.empty() == false) {
1107 w.Key("metadata");
1108 w.StartArray();
1109 for (auto& metadata : input.metadata) {
1110 w.StartObject();
1111 w.Key("name");
1112 w.String(metadata.name.c_str());
1113 auto s = std::to_string(int(metadata.type));
1114 w.Key("type");
1115 w.String(s.c_str());
1116 std::ostringstream oss;
1117 oss << metadata.defaultValue;
1118 w.Key("defaultValue");
1119 w.String(oss.str().c_str());
1120 w.Key("help");
1121 w.String(metadata.help.c_str());
1122 w.EndObject();
1123 }
1124 w.EndArray();
1125 }
1126 w.EndObject();
1127 }
1128 w.EndArray();
1129
1130 w.Key("outputs");
1131 w.StartArray();
1132 for (auto& output : processor.outputs) {
1133 w.StartObject();
1134 w.Key("binding");
1135 if (output.binding.value.empty()) {
1136 auto autogenerated = DataSpecUtils::describe(output);
1137 w.String(autogenerated.c_str());
1138 } else {
1139 w.String(output.binding.value.c_str());
1140 }
1142 w.Key("origin");
1143 w.String(dataType.origin.str, strnlen(dataType.origin.str, 4));
1144 w.Key("description");
1145 w.String(dataType.description.str, strnlen(dataType.description.str, 16));
1146 // FIXME: this will have to change once we introduce wildcards for
1147 // OutputSpec
1149 if (subSpec.has_value()) {
1150 w.Key("subspec");
1151 w.Uint64(*subSpec);
1152 }
1153 w.Key("lifetime");
1154 w.Uint((int)output.lifetime);
1155 if (output.metadata.empty() == false) {
1156 w.Key("metadata");
1157 w.StartArray();
1158 for (auto& metadata : output.metadata) {
1159 w.StartObject();
1160 w.Key("name");
1161 w.String(metadata.name.c_str());
1162 auto s = std::to_string(int(metadata.type));
1163 w.Key("type");
1164 w.String(s.c_str());
1165 std::ostringstream oss;
1166 oss << metadata.defaultValue;
1167 w.Key("defaultValue");
1168 w.String(oss.str().c_str());
1169 w.Key("help");
1170 w.String(metadata.help.c_str());
1171 w.EndObject();
1172 }
1173 w.EndArray();
1174 }
1175 w.EndObject();
1176 }
1177 w.EndArray();
1178
1179 w.Key("options");
1180 w.StartArray();
1181 for (auto& option : processor.options) {
1182 if (option.name == "start-value-enumeration" || option.name == "end-value-enumeration" || option.name == "step-value-enumeration" || option.name == "orbit-offset-enumeration" || option.name == "orbit-multiplier-enumeration") {
1183 continue;
1184 }
1185 w.StartObject();
1186 w.Key("name");
1187 w.String(option.name.c_str());
1188 auto s = std::to_string(int(option.type));
1189 w.Key("type");
1190 w.String(s.c_str());
1191 std::ostringstream oss;
1192 switch (option.type) {
1205 case VariantType::Dict:
1206 VariantJSONHelpers::write(oss, option.defaultValue);
1207 break;
1208 default:
1209 oss << option.defaultValue;
1210 break;
1211 }
1212 w.Key("defaultValue");
1213 w.String(oss.str().c_str());
1214 w.Key("help");
1215 w.String(option.help.c_str());
1216 w.Key("kind");
1217 w.String(std::to_string((int)option.kind).c_str());
1218 w.EndObject();
1219 }
1220 w.EndArray();
1221 w.Key("labels");
1222 w.StartArray();
1223 for (auto& label : processor.labels) {
1224 w.String(label.value.c_str());
1225 }
1226 w.EndArray();
1227 w.Key("metadata");
1228 w.StartArray();
1229 for (auto& metadatum : processor.metadata) {
1230 w.StartObject();
1231 w.Key("key");
1232 w.String(metadatum.key.c_str());
1233 w.Key("value");
1234 w.String(metadatum.value.c_str());
1235 w.EndObject();
1236 }
1237 w.EndArray();
1238 w.Key("rank");
1239 w.Int(processor.rank);
1240 w.Key("nSlots");
1241 w.Int(processor.nSlots);
1242 w.Key("inputTimeSliceId");
1243 w.Int(processor.inputTimeSliceId);
1244 w.Key("maxInputTimeslices");
1245 w.Int(processor.maxInputTimeslices);
1246
1247 w.EndObject();
1248 }
1249 w.EndArray();
1250
1251 w.Key("metadata");
1252 w.StartArray();
1253 for (auto& info : metadata) {
1254 w.StartObject();
1255 w.Key("name");
1256 w.String(info.name.c_str());
1257 w.Key("executable");
1258 w.String(info.executable.c_str());
1259 w.Key("cmdLineArgs");
1260 w.StartArray();
1261 for (auto& arg : info.cmdLineArgs) {
1262 w.String(arg.c_str());
1263 }
1264 w.EndArray();
1265 w.Key("workflowOptions");
1266 w.StartArray();
1267 for (auto& option : info.workflowOptions) {
1268 w.StartObject();
1269 w.Key("name");
1270 w.String(option.name.c_str());
1271 auto s = std::to_string(int(option.type));
1272 w.Key("type");
1273 w.String(s.c_str());
1274 std::ostringstream oss;
1275 oss << option.defaultValue;
1276 w.Key("defaultValue");
1277 w.String(oss.str().c_str());
1278 w.Key("help");
1279 w.String(option.help.c_str());
1280 w.EndObject();
1281 }
1282 w.EndArray();
1283 w.Key("channels");
1284 w.StartArray();
1285 for (auto& channel : info.channels) {
1286 w.String(channel.c_str());
1287 }
1288 w.EndArray();
1289 w.EndObject();
1290 }
1291 w.EndArray();
1292
1293 w.Key("command");
1294 w.String(commandInfo.command.c_str());
1295
1296 w.EndObject();
1297}
1298
1299} // namespace o2::framework
benchmark::State & state
int32_t i
uint32_t op
bool o
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
Something which can be matched against a header::DataDescription.
Something which can be matched against a header::DataOrigin.
Matcher on actual time, as reported in the DataProcessingHeader.
Something which can be matched against a header::SubSpecificationType.
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
GLuint * states
Definition glcorearb.h:4932
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
Variant emptyDict()
Definition Variant.h:407
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void merge(CommandInfo const &other)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static std::optional< framework::ConcreteDataMatcher > optionalConcreteDataMatcherFrom(data_matcher::DataDescriptorMatcher const &matcher)
static void write(std::ostream &o, Variant const &v)
std::vector< DataProcessorInfo > & metadata
friend std::ostream & operator<<(std::ostream &s, State state)
std::vector< ConfigParamSpec > outputOptions
bool Key(const Ch *str, SizeType length, bool copy)
std::vector< DataProcessorSpec > & dataProcessors
std::vector< ConfigParamSpec > inputOptions
bool String(const Ch *str, SizeType length, bool copy)
WorkflowImporter(std::vector< DataProcessorSpec > &o, std::vector< DataProcessorInfo > &m, CommandInfo &c)
std::vector< data_matcher::Node > inputMatcherNodes
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)
static bool import(std::istream &s, std::vector< DataProcessorSpec > &workflow, std::vector< DataProcessorInfo > &metadata, CommandInfo &command)
A typesafe reference to an element of the context.
static void walk(DataDescriptorMatcher const &top, EDGEWALKER edgeWalker, LEAFWALKER leafWalker)
From https://en.cppreference.com/w/cpp/utility/variant/visit.
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str