Project
Loading...
Searching...
No Matches
MakeRootTreeWriterSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_MAKEROOTTREEWRITERSPEC_H
12#define FRAMEWORK_MAKEROOTTREEWRITERSPEC_H
13
18
20#include "Framework/InputSpec.h"
25#include <algorithm>
26#include <vector>
27#include <string>
28#include <stdexcept>
29#include <variant>
30#include <unordered_set>
31#include <tuple>
32
33namespace o2
34{
35namespace framework
36{
37
196{
197 public:
199 enum struct TerminationPolicy {
200 Process,
201 Workflow,
202 };
203 const std::map<std::string, TerminationPolicy> TerminationPolicyMap = {
204 {"process", TerminationPolicy::Process},
205 {"workflow", TerminationPolicy::Workflow},
206 };
207
211 enum struct Action {
216 };
221 using CheckProcessing = std::function<std::tuple<TerminationCondition::Action, bool>(framework::DataRef const&)>;
225 using CheckReady = std::function<bool(o2::framework::DataRef const&)>;
226
228 std::variant<std::monostate, CheckReady, CheckProcessing> check;
229 };
230
233 using Process = std::function<void(ProcessingContext&)>;
234
236 std::variant<std::monostate, Process> callback;
237
239 constexpr operator bool()
240 {
241 return std::holds_alternative<Process>(callback);
242 }
243
246 {
247 if (std::holds_alternative<Process>(callback)) {
248 std::get<Process>(callback)(context);
249 }
250 }
251 };
252
255 static std::string asString(InputSpec const& arg) { return arg.binding; }
256 };
258 template <typename T, typename _ = void>
259 struct StringAssignable : public std::false_type {
260 };
261 template <typename T>
263 T,
264 std::enable_if_t<std::is_same_v<std::decay_t<T>, char const*> ||
265 std::is_same_v<std::decay_t<T>, char*> ||
266 std::is_same_v<std::decay_t<T>, std::string>>> : public std::true_type {
267 };
268
272 template <typename T>
273 struct BranchDefinition : public WriterType::BranchDef<T, InputSpec, KeyExtractor> {
276 template <typename KeyType, typename Arg = const char*, std::enable_if_t<StringAssignable<Arg>::value, int> = 0>
277 BranchDefinition(KeyType&& key, std::string _branchName, Arg _optionKey = "")
278 : WriterType::BranchDef<T, InputSpec, KeyExtractor>(std::forward<KeyType>(key), _branchName), optionKey(_optionKey)
279 {
280 }
283 template <typename KeyType, typename Arg = const char*, std::enable_if_t<StringAssignable<Arg>::value, int> = 0>
284 BranchDefinition(KeyType&& key, std::string _branchName, int _nofBranches, Arg _optionKey = "")
285 : WriterType::BranchDef<T, InputSpec, KeyExtractor>(std::forward<KeyType>(key), _branchName, _nofBranches), optionKey(_optionKey)
286 {
287 }
290 template <typename KeyType, typename Arg, typename... Args, std::enable_if_t<StringAssignable<Arg>::value, int> = 0>
291 BranchDefinition(KeyType key, std::string _branchName, Arg&& _optionKey, Args&&... args)
292 : WriterType::BranchDef<T, InputSpec, KeyExtractor>(std::forward<KeyType>(key), _branchName, std::forward<Args>(args)...), optionKey(_optionKey)
293 {
294 }
296 template <typename KeyType, typename Arg, typename... Args, std::enable_if_t<!StringAssignable<Arg>::value, int> = 0>
297 BranchDefinition(KeyType key, std::string _branchName, Arg&& arg, Args&&... args)
298 : WriterType::BranchDef<T, InputSpec, KeyExtractor>(std::forward<KeyType>(key), _branchName, std::forward<Arg>(arg), std::forward<Args>(args)...), optionKey()
299 {
300 }
302 std::string optionKey = "";
303 };
304
305 // helper to define auxiliary inputs, i.e. inputs not connected to a branch
308 operator InputSpec() const
309 {
310 return mSpec;
311 }
312 };
313
314 // helper to define tree attributes
316 std::string name;
317 std::string title = "";
318 };
319
320 // callback with signature void(TFile*, TTree*)
322
325
326 template <typename... Args>
327 MakeRootTreeWriterSpec(const char* processName, Args&&... args) : mProcessName(processName)
328 {
329 parseConstructorArgs<0>(std::forward<Args>(args)...);
330 }
331
333 {
334 // this creates the workflow spec
335 struct ProcessAttributes {
336 // worker instance
337 std::shared_ptr<WriterType> writer;
338 // keys for branch name options
339 std::vector<std::pair<std::string, std::string>> branchNameOptions;
340 // number of events to be processed
341 int nEvents = -1;
342 // autosave every nEventsAutoSave events
343 int nEventsAutoSave = -1;
344 // starting with all inputs, every input which has been indicated 'ready' is removed
345 std::unordered_set<std::string> activeInputs;
346 // event counter
347 int counter = 0;
348 // indicate what to terminate upon ready: process or workflow
349 TerminationPolicy terminationPolicy = TerminationPolicy::Process;
350 // custom termination condition
351 TerminationCondition terminationCondition;
352 // custom preprocessor
353 Preprocessor preprocessor;
354 // the total number of served branches on the n inputs
355 size_t nofBranches;
356 };
357 auto processAttributes = std::make_shared<ProcessAttributes>();
358 processAttributes->writer = mWriter;
359 processAttributes->branchNameOptions = mBranchNameOptions;
360 processAttributes->terminationCondition = std::move(mTerminationCondition);
361 processAttributes->preprocessor = std::move(mPreprocessor);
362
363 // set the list of active inputs, every input which is indecated as 'ready' will be removed from the list
364 // the process is ready if the list is empty
365 for (auto const& input : mInputs) {
366 processAttributes->activeInputs.emplace(input.binding);
367 }
368 processAttributes->nofBranches = mNofBranches;
369
370 // the init function is returned to the DPL in order to init the process
371 auto initFct = [processAttributes, TerminationPolicyMap = TerminationPolicyMap](InitContext& ic) {
372 auto& branchNameOptions = processAttributes->branchNameOptions;
373 auto filename = ic.options().get<std::string>("outfile");
374 auto treename = ic.options().get<std::string>("treename");
375 auto treetitle = ic.options().get<std::string>("treetitle");
376 auto outdir = ic.options().get<std::string>("output-dir");
377 processAttributes->nEvents = ic.options().get<int>("nevents");
378 if (processAttributes->nEvents > 0 && processAttributes->activeInputs.size() != processAttributes->nofBranches) {
379 LOG(warning) << "the n inputs serve in total m branches with n != m, this means that there will be data for\n"
380 << "different branches on the same input. Be aware that the --nevents option might lead to incomplete\n"
381 << "data in the output file as the number of processed input sets is counted";
382 }
383 processAttributes->nEventsAutoSave = ic.options().get<int>("autosave");
384 try {
385 processAttributes->terminationPolicy = TerminationPolicyMap.at(ic.options().get<std::string>("terminate"));
386 } catch (std::out_of_range&) {
387 throw std::invalid_argument(std::string("invalid termination policy: ") + ic.options().get<std::string>("terminate"));
388 }
389 if (filename.empty() || treename.empty()) {
390 throw std::invalid_argument("output file name and tree name are mandatory options");
391 }
392 for (size_t branchIndex = 0; branchIndex < branchNameOptions.size(); branchIndex++) {
393 // pair of key (first) - value (second)
394 if (branchNameOptions[branchIndex].first.empty()) {
395 continue;
396 }
397 auto branchName = ic.options().get<std::string>(branchNameOptions[branchIndex].first.c_str());
398 processAttributes->writer->setBranchName(branchIndex, branchName.c_str());
399 }
400 if (!outdir.empty() && outdir != "none") {
401 if (outdir.back() != '/') {
402 outdir += '/';
403 }
404 filename = outdir + filename;
405 }
406 processAttributes->writer->init(filename.c_str(), treename.c_str(), treetitle.c_str());
407 // the callback to be set as hook at stop of processing for the framework
408 auto finishWriting = [processAttributes]() {
409 processAttributes->writer->close();
410 };
411
412 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(finishWriting);
413
414 auto processingFct = [processAttributes](ProcessingContext& pc) {
415 auto& writer = processAttributes->writer;
416 auto& terminationPolicy = processAttributes->terminationPolicy;
417 auto& terminationCondition = processAttributes->terminationCondition;
418 auto& preprocessor = processAttributes->preprocessor;
419 auto& activeInputs = processAttributes->activeInputs;
420 auto& counter = processAttributes->counter;
421 auto& nEvents = processAttributes->nEvents;
422 auto& nEventsAutoSave = processAttributes->nEventsAutoSave;
423 if (writer->isClosed()) {
424 return;
425 }
426
427 // if the termination condition contains function of type CheckProcessing, this is checked
428 // before the processing, currently implemented logic:
429 // - processing is skipped if this is indicated by at least one input
430 // - if input is 'ready' it is removed from the list of active inputs
431 auto checkProcessing = [&terminationCondition, &activeInputs](auto const& inputs) {
432 bool doProcessing = true;
433 if (std::holds_alternative<TerminationCondition::CheckProcessing>(terminationCondition.check)) {
434 auto& check = std::get<TerminationCondition::CheckProcessing>(terminationCondition.check);
435 for (auto const& ref : inputs) {
436 auto iter = activeInputs.find(ref.spec->binding);
437 auto [action, ready] = check(ref);
439 // this condition tells us not to process the data any further
440 doProcessing = false;
441 }
442 if (iter != activeInputs.end() && ready) {
443 // this input is ready, remove from active inputs
444 activeInputs.erase(iter);
445 }
446 }
447 }
448 return doProcessing;
449 };
450 // if the termination condition contains function of type CheckReady, this is checked
451 // after the processing. All inputs marked 'ready' are removed from the list of active inputs,
452 // process treated as 'ready' if no active inputs
453 auto checkReady = [&terminationCondition, &activeInputs](auto const& inputs) {
454 if (std::holds_alternative<TerminationCondition::CheckReady>(terminationCondition.check)) {
455 auto& check = std::get<TerminationCondition::CheckReady>(terminationCondition.check);
456 for (auto const& ref : inputs) {
457 auto iter = activeInputs.find(ref.spec->binding);
458 if (iter != activeInputs.end() && check(ref)) {
459 // this input is ready, remove from active inputs
460 activeInputs.erase(iter);
461 }
462 }
463 }
464 return activeInputs.size() == 0;
465 };
466
467 if (preprocessor) {
468 preprocessor(pc);
469 }
470 if (checkProcessing(pc.inputs())) {
471 (*writer)(pc.inputs());
472 counter = counter + 1;
473 }
474
475 if ((nEvents >= 0 && counter == nEvents) || checkReady(pc.inputs())) {
476 writer->close();
477 pc.services().get<ControlService>().readyToQuit(terminationPolicy == TerminationPolicy::Workflow ? QuitRequest::All : QuitRequest::Me);
478 } else if (nEventsAutoSave > 0 && counter && (counter % nEventsAutoSave) == 0) {
479 writer->autoSave();
480 }
481 };
482
483 return processingFct;
484 };
485
486 Options options{
487 // default options
488 {"outfile", VariantType::String, mDefaultFileName.c_str(), {"Name of the output file"}},
489 {"output-dir", VariantType::String, mDefaultDir.c_str(), {"Output directory"}},
490 {"treename", VariantType::String, mDefaultTreeName.c_str(), {"Name of tree"}},
491 {"treetitle", VariantType::String, mDefaultTreeTitle.c_str(), {"Title of tree"}},
492 {"nevents", VariantType::Int, mDefaultNofEvents, {"Number of events to execute"}},
493 {"autosave", VariantType::Int, mDefaultAutoSave, {"Autosave after number of events"}},
494 {"terminate", VariantType::String, mDefaultTerminationPolicy.c_str(), {"Terminate the 'process' or 'workflow'"}},
495 };
496 for (size_t branchIndex = 0; branchIndex < mBranchNameOptions.size(); branchIndex++) {
497 // adding option definitions for those ones defined in the branch definition
498 if (mBranchNameOptions[branchIndex].first.empty()) {
499 continue;
500 }
501 options.push_back(ConfigParamSpec(mBranchNameOptions[branchIndex].first.c_str(), // option key
502 VariantType::String, // option argument type
503 mBranchNameOptions[branchIndex].second.c_str(), // default branch name
504 {"configurable branch name"} // help message
505 ));
506 }
507
508 mInputRoutes.insert(mInputRoutes.end(), mInputs.begin(), mInputs.end());
509 return DataProcessorSpec{
510 // processing spec generated from the class configuartion
511 mProcessName.c_str(), // name of the process
512 mInputRoutes, // list of inputs
513 Outputs{}, // no outputs
514 AlgorithmSpec(initFct), // return the init function
515 std::move(options), // processor options
516 };
517 }
518
519 private:
522 template <size_t N, typename... Args>
523 void parseConstructorArgs(const char* name, Args&&... args)
524 {
525 static_assert(N == 0, "wrong argument order, default file and tree options must come before branch specs");
526 // this can be called twice, the first time we set the default file name
527 // and if we are here for a second time, we set the default tree name
528 if (mDefaultFileName.empty()) {
529 mDefaultFileName = name;
530 } else {
531 mDefaultTreeName = name;
532 mDefaultTreeTitle = name;
533 }
534
535 parseConstructorArgs<N>(std::forward<Args>(args)...);
536 }
537
540 template <size_t N, typename... Args>
541 void parseConstructorArgs(int arg, Args&&... args)
542 {
543 static_assert(N == 0, "wrong argument order, default file and tree options must come before branch specs");
544 if (mNIntArgCounter == 0) {
545 mDefaultNofEvents = arg;
546 } else if (mNIntArgCounter == 1) {
547 mDefaultAutoSave = arg;
548 } else {
549 throw std::logic_error("Too many integer arguments in the constructor");
550 }
551 mNIntArgCounter++;
552 parseConstructorArgs<N>(std::forward<Args>(args)...);
553 }
554
556 template <size_t N, typename... Args>
557 void parseConstructorArgs(TerminationPolicy arg, Args&&... args)
558 {
559 static_assert(N == 0, "wrong argument order, default file and tree, and all options must come before branch specs");
560 for (const auto& policy : TerminationPolicyMap) {
561 if (policy.second == arg) {
562 mDefaultTerminationPolicy = policy.first;
563 return parseConstructorArgs<N>(std::forward<Args>(args)...);
564 }
565 }
566 // here we only get if the enum and map definitions are inconsistent
567 throw std::logic_error("Internal mismatch of policy ids and keys");
568 }
569
571 template <size_t N, typename... Args>
572 void parseConstructorArgs(TerminationCondition&& arg, Args&&... args)
573 {
574 static_assert(N == 0, "wrong argument order, default file and tree, and all options must come before branch specs");
575 mTerminationCondition = std::move(arg);
576 parseConstructorArgs<N>(std::forward<Args>(args)...);
577 }
578
580 template <size_t N, typename... Args>
581 void parseConstructorArgs(Preprocessor&& arg, Args&&... args)
582 {
583 static_assert(N == 0, "wrong argument order, default file and tree, and all options must come before branch specs");
584 mPreprocessor = std::move(arg);
585 parseConstructorArgs<N>(std::forward<Args>(args)...);
586 }
587
588 template <size_t N, typename... Args>
589 void parseConstructorArgs(AuxInputRoute&& aux, Args&&... args)
590 {
591 mInputRoutes.emplace_back(aux);
592 parseConstructorArgs<N>(std::forward<Args>(args)...);
593 }
594
595 template <size_t N, typename... Args>
596 void parseConstructorArgs(CustomClose&& callback, Args&&... args)
597 {
598 mCustomClose = callback;
599 parseConstructorArgs<N>(std::forward<Args>(args)...);
600 }
601
602 template <size_t N, typename... Args>
603 void parseConstructorArgs(TreeAttributes&& att, Args&&... args)
604 {
605 mDefaultTreeName = att.name;
606 mDefaultTreeTitle = att.title.empty() ? att.name : att.title;
607 parseConstructorArgs<N>(std::forward<Args>(args)...);
608 }
609
614 template <size_t N, typename T, typename... Args>
615 void parseConstructorArgs(BranchDefinition<T>&& def, Args&&... args)
616 {
617 if (def.nofBranches > 0) {
618 // number of branches set to 0 will skip the definition, this allows to
619 // dynamically disable branches, while all possible definitions can
620 // be specified at compile time
621 mInputs.insert(mInputs.end(), def.keys.begin(), def.keys.end());
622 mBranchNameOptions.emplace_back(def.optionKey, def.branchName);
623 mNofBranches += def.nofBranches;
624 } else {
625 // insert an empty placeholder
626 mBranchNameOptions.emplace_back("", "");
627 }
628 parseConstructorArgs<N + 1>(std::forward<Args>(args)...);
629 if constexpr (N == 0) {
630 mWriter = std::make_shared<WriterType>(nullptr, nullptr, mCustomClose, std::forward<BranchDefinition<T>>(def), std::forward<Args>(args)...);
631 }
632 }
633
634 // this terminates the argument parsing
635 template <size_t N>
636 void parseConstructorArgs()
637 {
638 }
639
640 std::shared_ptr<WriterType> mWriter;
641 std::string mProcessName;
642 std::vector<InputSpec> mInputs;
643 std::vector<InputSpec> mInputRoutes;
644 std::vector<std::pair<std::string, std::string>> mBranchNameOptions;
645 std::string mDefaultFileName;
646 std::string mDefaultTreeName;
647 std::string mDefaultTreeTitle;
648 std::string mDefaultDir = "none";
649 int mDefaultNofEvents = -1;
650 int mDefaultAutoSave = -1;
651 std::string mDefaultTerminationPolicy = "process";
652 TerminationCondition mTerminationCondition;
653 Preprocessor mPreprocessor;
654 size_t mNofBranches = 0;
655 int mNIntArgCounter = 0;
656 CustomClose mCustomClose;
657};
658} // namespace framework
659} // namespace o2
660
661#endif // FRAMEWORK_MAKEROOTTREEWRITERSPEC_H
A generic writer for ROOT TTrees.
StringRef key
Generate a processor spec for the RootTreeWriter utility.
MakeRootTreeWriterSpec(const char *processName, Args &&... args)
const std::map< std::string, TerminationPolicy > TerminationPolicyMap
MakeRootTreeWriterSpec()=delete
default constructor forbidden
A generic writer interface for ROOT TTree objects.
std::function< void(TFile *file, TTree *tree)> CustomClose
GLuint const GLchar * name
Definition glcorearb.h:781
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint counter
Definition glcorearb.h:3987
void check(const std::vector< std::string > &arguments, const std::vector< ConfigParamSpec > &workflowOptions, const std::vector< DeviceSpec > &deviceSpecs, CheckMatrix &matrix)
@ Me
Only quit this data processor.
@ All
Quit all data processor, regardless of their state.
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string filename()
define the action to be done by the writer
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
BranchDefinition(KeyType &&key, std::string _branchName, int _nofBranches, Arg _optionKey="")
BranchDefinition(KeyType key, std::string _branchName, Arg &&arg, Args &&... args)
constructor, the argument pack is simply forwarded to base class
BranchDefinition(KeyType key, std::string _branchName, Arg &&_optionKey, Args &&... args)
BranchDefinition(KeyType &&key, std::string _branchName, Arg _optionKey="")
unary helper functor to extract the input key from the InputSpec
static std::string asString(InputSpec const &arg)
void operator()(ProcessingContext &context)
execute the preprocessor
std::variant< std::monostate, Process > callback
the callback
std::function< void(ProcessingContext &)> Process
processing callback
std::variant< std::monostate, CheckReady, CheckProcessing > check
the actual evaluator
std::function< std::tuple< TerminationCondition::Action, bool >(framework::DataRef const &)> CheckProcessing
@ DoProcessing
carry out processing of the input object
std::function< bool(o2::framework::DataRef const &)> CheckReady
BranchDef is used to define the mapping between inputs and branches.
BranchDef(key_type key, std::string _branchName, size_t _nofBranches=1)
const int nEvents
Definition test_Fifo.cxx:27
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"