Project
Loading...
Searching...
No Matches
test_RootTreeWriterWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include "Framework/InputSpec.h"
27#include "Headers/DataHeader.h"
29#include "../../Core/test/TestClasses.h"
30#include "Framework/Logger.h"
31#include <TSystem.h>
32#include <TTree.h>
33#include <TFile.h>
34#include <vector>
35#include <stdexcept>
36#include <iostream>
37// note: std filesystem is first supported in gcc 8
38#include <filesystem>
39
40using namespace o2::framework;
41
42// a helper class to do the checking at the end of the program when
43// the destructor of the class is called.
45{
46 public:
47 StaticChecker() = default;
48 // have to throw an exception in the destructor if checking fails
49 // this is ok in this cae because no other instances which would require proper
50 // cleanup are expected
51 ~StaticChecker() noexcept(false)
52 {
53 // the check in the desctructor makes sure that the workflow has been run at all
54 if (mChecks.size() > 0) {
55 throw std::runtime_error("Workflow error: Checks have not been executed");
56 }
57 }
58
59 struct Attributes {
60 std::string fileName;
61 int nEntries = 0;
62 int nBranches = 0;
63 };
64
65 void runChecks()
66 {
67 for (auto const& check : mChecks) {
68 TFile* file = TFile::Open(check.fileName.c_str());
69 if (file == nullptr) {
70 setError(std::string("missing file ") + check.fileName.c_str());
71 continue;
72 }
73 TTree* tree = reinterpret_cast<TTree*>(file->GetObjectChecked("testtree", "TTree"));
74 if (tree == nullptr) {
75 setError(std::string("can not find tree 'testtree' in file ") + check.fileName.c_str());
76 } else if (tree->GetEntries() != check.nEntries) {
77 setError(std::string("inconsistent number of entries in 'testtree' of file ") + check.fileName + " expecting " + std::to_string(check.nEntries) + " got " + std::to_string(tree->GetEntries()));
78 } else if (tree->GetNbranches() != check.nBranches) {
79 setError(std::string("inconsistent number of branches in 'testtree' of file ") + check.fileName + " expecting " + std::to_string(check.nBranches) + " got " + std::to_string(tree->GetNbranches()));
80 }
81 file->Close();
82 std::filesystem::remove(check.fileName.c_str());
83 }
84 mChecks.clear();
85 if (mErrorMessage.empty() == false) {
86 throw std::runtime_error(mErrorMessage);
87 }
88 }
89
90 void addCheck(std::string filename, int entries, int branches = 0)
91 {
92 mChecks.emplace_back(Attributes{filename, entries, branches});
93 }
94
95 template <typename T>
96 void setError(T const& message)
97 {
98 if (mErrorMessage.empty()) {
99 mErrorMessage = message;
100 }
101 }
102
103 void clear()
104 {
105 mChecks.clear();
106 mErrorMessage.clear();
107 }
108
109 private:
110 std::vector<Attributes> mChecks;
111 std::string mErrorMessage;
112};
113static StaticChecker sChecker;
114
116{
117 hook = [](const char* idstring) {
118 // run the checks in the master driver process, all the individual
119 // processes have the same checker setup, so this needs to be cleared for child
120 // (device) processes.
121 if (idstring == nullptr) {
122 sChecker.runChecks();
123 } else {
124 sChecker.clear();
125 }
126 };
127}
128
130
131static constexpr int sTreeSize = 10; // elements to send and write
133{
134 auto initFct = [](InitContext& ic) {
135 auto counter = std::make_shared<int>();
136 *counter = 0;
137
138 auto processingFct = [counter](ProcessingContext& pc) {
139 if (*counter >= sTreeSize) {
140 // don't publish more
141 return;
142 }
144 pc.outputs().snapshot(OutputRef{"output", 0}, a);
145 pc.outputs().snapshot(OutputRef{"output", 1}, a);
146 int& metadata = pc.outputs().make<int>(Output{"TST", "METADATA", 0});
147 metadata = *counter;
148 *counter = *counter + 1;
149 if (*counter >= sTreeSize) {
150 pc.services().get<ControlService>().endOfStream();
151 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
152 }
153 };
154
155 return processingFct;
156 };
157 return DataProcessorSpec{"source", // name of the processor
158 {},
159 {OutputSpec{{"output"}, "TST", "SOMEOBJECT", 0, Lifetime::Timeframe},
160 OutputSpec{{"output"}, "TST", "SOMEOBJECT", 1, Lifetime::Timeframe},
161 OutputSpec{{"meta"}, "TST", "METADATA", 0, Lifetime::Timeframe}},
162 AlgorithmSpec(initFct)};
163}
164
165template <typename T>
168{
169 std::string fileName = std::filesystem::temp_directory_path().string();
170 fileName += "/test_RootTreeWriter";
171 std::string altFileName = fileName + "_alt.root";
172 fileName += ".root";
173
174 // the first writer is configured with number of events (1)
175 // it receives two routes and saves those to two branches
176 // a third route is disabled (and not published by the source)
177 sChecker.addCheck(fileName, 1, 2);
178 // the second writer uses a check function to determine when its ready
179 // the first definition creates two branches, input data comes in over the
180 // same route, together with the second definition its three branches
181 sChecker.addCheck(altFileName, sTreeSize, 3);
182
183 auto checkReady = [counter = std::make_shared<int>(0)](auto) -> bool {
184 *counter = *counter + 1;
185 // processing function checks the callback for two inputs -> factor 2
186 // the two last calls have to return true to signal on both inputs
187 return (*counter + 1) >= (2 * sTreeSize);
188 };
189
190 auto getIndex = [](auto const& ref) {
191 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
192 return static_cast<size_t>(dh->subSpecification);
193 };
194 auto getName = [](std::string base, size_t index) {
195 return base + "_" + std::to_string(index);
196 };
197
198 auto preprocessor = [](ProcessingContext& ctx) {
199 for (auto const& ref : InputRecordWalker(ctx.inputs())) {
200 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
201 LOGP(info, "got data: {}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
202 }
203 };
204
205 using Polymorphic = o2::test::Polymorphic;
206 return WorkflowSpec{
209 ( //
210 "sink1", // process name
211 fileName.c_str(), // default file name
212 MakeRootTreeWriterSpec::TreeAttributes{"testtree", "what a naive test"}, // default tree name
213 1, // default number of events
214 BranchDefinition<Polymorphic>{InputSpec{"in", "TST", "SOMEOBJECT", 0}, "polyobject"}, // branch config
215 BranchDefinition<int>{InputSpec{"disabl", "TST", "NODATA"}, "dummy", 0}, // disabled branch config
216 BranchDefinition<int>{InputSpec{"meta", "TST", "METADATA"}, "counter"} // branch config
217 )(), // call the generator
219 ( //
220 "sink2", // process name
221 altFileName.c_str(), // default file name
222 "testtree", // default tree name
223 MakeRootTreeWriterSpec::TerminationPolicy::Workflow, // terminate the workflow
224 MakeRootTreeWriterSpec::TerminationCondition{checkReady}, // custom termination condition
225 MakeRootTreeWriterSpec::Preprocessor{preprocessor}, // custom preprocessor
227 InputSpec{"input", // key
228 ConcreteDataTypeMatcher{"TST", "SOMEOBJECT"}}, // subspec independent
229 "polyobject", // base name of branch
230 "", // empty option
231 2, // two branches
232 RootTreeWriter::IndexExtractor(getIndex), // index retriever
233 RootTreeWriter::BranchNameMapper(getName) // branch name retriever
234 }, //
235 BranchDefinition<int>{InputSpec{"meta", "TST", "METADATA"}, "counter"} // branch config
236 )() // call the generator
237 };
238}
std::string getName(const TDataMember *dm, int index, int size)
A helper class to iteratate over all parts of all input routes.
Configurable generator for RootTreeWriter processor spec.
A generic writer for ROOT TTrees.
StaticChecker()=default
~StaticChecker() noexcept(false)
void setError(T const &message)
void addCheck(std::string filename, int entries, int branches=0)
A helper class to iteratate over all parts of all input routes.
Generate a processor spec for the RootTreeWriter utility.
std::function< std::string(std::string, size_t)> BranchNameMapper
std::function< size_t(o2::framework::DataRef const &)> IndexExtractor
GLuint index
Definition glcorearb.h:781
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLint ref
Definition glcorearb.h:291
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< void(const char *)> OnWorkflowTerminationHook
void check(const std::vector< std::string > &arguments, const std::vector< ConfigParamSpec > &workflowOptions, const std::vector< DeviceSpec > &deviceSpecs, CheckMatrix &matrix)
std::vector< DataProcessorSpec > WorkflowSpec
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
DataProcessorSpec getSourceSpec()
void customize(o2::framework::OnWorkflowTerminationHook &hook)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))