Project
Loading...
Searching...
No Matches
TPCIntegrateClusterReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <vector>
15#include <boost/algorithm/string/predicate.hpp>
16
21#include "Framework/Task.h"
28#include "TChain.h"
29#include "TGrid.h"
30#include "TChainElement.h"
31
32using namespace o2::framework;
33
34namespace o2
35{
36namespace tpc
37{
38
40{
41 public:
43 ~IntegratedClusterReader() override = default;
44 void init(InitContext& ic) final;
45 void run(ProcessingContext& pc) final;
46
47 private:
48 void connectTrees();
49 void fillIndices();
50 void fillIndicesFromMeta();
51
53 void createMetaTTree(ProcessingContext& pc) const;
54
55 int mLane = 0;
56 int mNLanes = 1;
57 int mChainEntry = 0;
58 int mFirstTF{0};
59 int mLastTF{-1};
60 std::unique_ptr<TChain> mChain;
61 std::vector<std::string> mFileNames;
62 ITPCC mTPCC, *mTPCCPtr = &mTPCC;
63 o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo;
64 std::vector<std::tuple<unsigned long, int, int>> mIndices;
65 std::string mMetaOutFileDir{};
66 std::vector<std::string> mMetaInFiles{};
67 std::unique_ptr<TFile> mTFile;
68 TTree* mTree = nullptr;
69};
70
72{
73 // possible output meta dir
74 mMetaOutFileDir = ic.options().get<std::string>("output-meta-dir");
75 if (mMetaOutFileDir != "none") {
76 LOGP(info, "Setting up meta output directory to {}", mMetaOutFileDir);
77 mMetaOutFileDir = o2::utils::Str::rectifyDirectory(mMetaOutFileDir);
78 }
79
80 mMetaInFiles = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>("input-meta-files"));
81 if (!mMetaInFiles.empty()) {
82 // only one directory should be set!
83 if (mMetaOutFileDir != "none") {
84 LOGP(error, "Only input-meta-files or output-meta-dir should be set. Setting input-meta-files to none");
85 mMetaInFiles.clear();
86 } else {
87 LOGP(info, "Setting up meta input files");
88 if (mMetaInFiles.size() == 1) {
89 if (boost::algorithm::ends_with(mMetaInFiles.front(), "txt")) {
90 LOGP(info, "Reading meta files from input file {}", mMetaInFiles.front());
91 std::ifstream is(mMetaInFiles.front());
92 std::istream_iterator<std::string> start(is);
93 std::istream_iterator<std::string> end;
94 std::vector<std::string> fileNamesTmp(start, end);
95 mMetaInFiles = fileNamesTmp;
96 }
97 }
98 }
99 }
100
101 mFirstTF = ic.options().get<int>("firstTF");
102 mLastTF = ic.options().get<int>("lastTF");
103 mLane = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
104 mChainEntry = mLane;
105 mNLanes = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
106
107 const auto dontCheckFileAccess = ic.options().get<bool>("dont-check-file-access");
108 auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>("tpc-currents-infiles"));
109
110 // check if only one input file (a txt file contaning a list of files is provided)
111 if (fileList.size() == 1) {
112 if (boost::algorithm::ends_with(fileList.front(), "txt")) {
113 LOGP(info, "Reading files from input file {}", fileList.front());
114 std::ifstream is(fileList.front());
115 std::istream_iterator<std::string> start(is);
116 std::istream_iterator<std::string> end;
117 std::vector<std::string> fileNamesTmp(start, end);
118 fileList = fileNamesTmp;
119 }
120 }
121
122 const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir"));
123 for (const auto& file : fileList) {
124 if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) {
125 LOG(fatal) << "Failed to open alien connection";
126 }
127 const auto fileDir = o2::utils::Str::concat_string(inpDir, file);
128 if (!dontCheckFileAccess) {
129 std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data()));
130 if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) {
131 LOGP(warning, "Could not open file {}", fileDir);
132 continue;
133 }
134 }
135 mFileNames.emplace_back(fileDir);
136 }
137
138 if (mFileNames.size() == 0) {
139 LOGP(error, "No input files to process");
140 }
141 connectTrees();
142}
143
145{
146 // check time order inside the TChain
147 if (mChainEntry == mLane) {
148 // create meta data if requested
149 if (mMetaOutFileDir != "none") {
150 createMetaTTree(pc);
151 LOGP(info, "Quit");
152 pc.services().get<ControlService>().endOfStream();
153 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
154 return;
155 }
156
157 if (!mMetaInFiles.empty()) {
158 LOGP(info, "Reading in meta data from files...");
159 fillIndicesFromMeta();
160 } else {
161 fillIndices();
162 }
163 LOGP(info, "Processing {} TFs out ouf {} TFs with first index in chain {}", mLastTF, mIndices.size(), mChainEntry);
164 }
165
166 if (mChainEntry >= mIndices.size() || (mLastTF != -1 && (pc.services().get<o2::framework::TimingInfo>().tfCounter >= mLastTF))) {
167 LOGP(info, "Quit. mChainEntry {} mIndices.size {} mLastTF {} tfCounter {}", mChainEntry, mIndices.size(), mLastTF, pc.services().get<o2::framework::TimingInfo>().tfCounter);
168 pc.services().get<ControlService>().endOfStream();
169 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
170 return;
171 }
172
173 if (mMetaInFiles.empty()) {
174 const int entry = std::get<1>(mIndices[mChainEntry]);
175 LOGP(info, "Processing entry {}", entry);
176 mChain->GetEntry(entry);
177 } else {
178 const int iFile = std::get<1>(mIndices[mChainEntry]);
179 const int treeEntry = std::get<2>(mIndices[mChainEntry]);
180 LOGP(info, "Processing file {} with TTree index {}", iFile, treeEntry);
181 TChainElement* chainEle = (TChainElement*)(mChain->GetListOfFiles()->At(iFile));
182
183 // check if last file is sane as current file
184 if (mTFile && (std::string(mTFile->GetName()) == std::string(chainEle->GetTitle()))) {
185 LOGP(info, "File is same. Do not need to reload...");
186 } else {
187 // open new file, destroy old TTree
188 mTFile = std::unique_ptr<TFile>{TFile::Open(chainEle->GetTitle())};
189 mTree = nullptr;
190 }
191
192 if (!mTFile) {
193 LOGP(warning, "File {} is nullptr", chainEle->GetTitle());
194 return;
195 }
196
197 if (!mTree) {
198 LOGP(info, "Setting up new TTree");
199 mTFile->GetObject("itpcc", mTree);
200 if (!mTree) {
201 LOGP(warning, "Tree for file {} is nullptr", chainEle->GetTitle());
202 return;
203 }
204
205 mTree->SetBranchAddress("ITPCC", &mTPCCPtr);
206 mTree->SetBranchAddress("tfID", &mTFinfoPtr);
207 }
208 mTree->GetEntry(treeEntry);
209 }
210 mChainEntry += mNLanes;
211
212 // inject correct timing informations
213 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
214 timingInfo.firstTForbit = mTFinfo.firstTForbit;
215 timingInfo.tfCounter = mTFinfo.tfCounter;
216 timingInfo.runNumber = mTFinfo.runNumber;
217 timingInfo.creation = mTFinfo.creation;
218
219 LOGP(info, "Processed data for firstTForbit {} and tfCounter {}", timingInfo.firstTForbit, timingInfo.tfCounter);
220 pc.outputs().snapshot(Output{header::gDataOriginTPC, getDataDescriptionTPCC()}, mTPCC);
221 usleep(100);
222}
223
224void IntegratedClusterReader::connectTrees()
225{
226 mChain.reset(new TChain("itpcc"));
227 for (const auto& file : mFileNames) {
228 LOGP(info, "Adding file to chain: {}", file);
229 mChain->AddFile(file.data());
230 }
231 assert(mChain->GetEntries());
232 mChain->SetBranchAddress("ITPCC", &mTPCCPtr);
233 mChain->SetBranchAddress("tfID", &mTFinfoPtr);
234}
235
236void IntegratedClusterReader::createMetaTTree(ProcessingContext& pc) const
237{
238 const std::string outFileMeta = fmt::format("{}/tpc_meta_{}.root", mMetaOutFileDir, mLane);
239 LOGP(info, "Producing meta file to: {}", outFileMeta);
240
241 const int nFilesInChain = mChain->GetListOfFiles()->GetEntries();
242 const int nFilesPerLane = nFilesInChain / mNLanes;
243 const int firstFile = mLane * nFilesPerLane;
244 const int lastFile = (mLane == mNLanes - 1) ? nFilesInChain : (firstFile + nFilesPerLane);
245 LOGP(info, "Processing files {} to {}", firstFile, lastFile - 1);
246
247 utils::TreeStreamRedirector pcstream(outFileMeta.data(), "RECREATE");
248 for (unsigned int iFile = firstFile; iFile < lastFile; ++iFile) {
249 TChainElement* chainEle = (TChainElement*)(mChain->GetListOfFiles()->At(iFile));
250 auto file = std::unique_ptr<TFile>{TFile::Open(chainEle->GetTitle())};
251 if (!file) {
252 LOGP(warning, "File {} is nullptr", chainEle->GetTitle());
253 continue;
254 }
255 TTree* tree = nullptr;
256 file->GetObject("itpcc", tree);
257 if (!tree) {
258 LOGP(warning, "Tree for file {} is nullptr", chainEle->GetTitle());
259 continue;
260 }
261 unsigned int firstTForbit = 0;
262 unsigned int tfCounter = 0;
263 tree->SetBranchAddress("tfID.firstTForbit", &firstTForbit);
264 tree->SetBranchAddress("tfID.tfCounter", &tfCounter);
265 tree->SetBranchStatus("*", 0);
266 tree->SetBranchStatus("firstTForbit", 1);
267 tree->SetBranchStatus("tfCounter", 1);
268 const int entries = tree->GetEntries();
269 for (unsigned int i = 0; i < entries; ++i) {
270 tree->GetEntry(i);
271 pcstream << "meta"
272 << "firstTForbit=" << firstTForbit
273 << "tfCounter=" << tfCounter // TF id
274 << "tree_entry=" << i // entry in ttree
275 << "file=" << iFile // file in chain
276 << "lane=" << mLane // current lane
277 << "\n";
278 }
279 }
280 LOGP(info, "Created meta data for input files {} to {} for a total of {} files", firstFile, lastFile - 1, nFilesInChain);
281}
282
283void IntegratedClusterReader::fillIndices()
284{
285 LOGP(info, "Filling indices from input files");
286 mIndices.clear();
287 mIndices.reserve(mChain->GetEntries());
288 // disable all branches except the firstTForbit branch to significantly speed up the loop over the TTree
289 mChain->SetBranchStatus("*", 0);
290 mChain->SetBranchStatus("firstTForbit", 1);
291 uint32_t countTFs = mChain->GetEntries();
292 // in case processing doesnt start from the first TF, the first TF to be processed needs to be defined
293 if (mLastTF != -1) {
294 countTFs = 0;
295 mChain->SetBranchStatus("tfCounter", 1);
296 }
297 for (unsigned long i = 0; i < mChain->GetEntries(); i++) {
298 mChain->GetEntry(i);
299 // in case indices are loaded directly from the chain store the global index
300 mIndices.emplace_back(std::make_tuple(mTFinfo.firstTForbit, i, -1));
301
302 if ((mLastTF != -1) && (mTFinfo.tfCounter <= mLastTF)) {
303 if ((mTFinfo.tfCounter >= mFirstTF)) {
304 // count number of TFs to process
305 ++countTFs;
306 } else {
307 // keep track of first entry in the chain
308 ++mChainEntry;
309 }
310 }
311 }
312 mChain->SetBranchStatus("*", 1);
313 std::sort(mIndices.begin(), mIndices.end());
314 mLastTF = countTFs;
315}
316
317void IntegratedClusterReader::fillIndicesFromMeta()
318{
319 std::vector<int> indicesMeta(mMetaInFiles.size()); // sorted indices if input meta files are not sorted
320 for (int ifile = 0; ifile < mMetaInFiles.size(); ++ifile) {
321 auto file = std::unique_ptr<TFile>{TFile::Open(mMetaInFiles[ifile].data())};
322 TTree* tree = nullptr;
323 file->GetObject("meta", tree);
324 if (!tree) {
325 LOGP(warning, "Tree for file {} is nullptr", mMetaInFiles[ifile]);
326 continue;
327 }
328 int fileIdx = 0;
329 tree->SetBranchAddress("lane", &fileIdx);
330 tree->GetEntry(0);
331 LOGP(info, "Index of file {} is {}", ifile, fileIdx);
332 indicesMeta[fileIdx] = ifile;
333 }
334 TChain metaChain("meta");
335 for (const auto idxFile : indicesMeta) {
336 LOGP(info, "Adding to meta chain: {}", mMetaInFiles[idxFile]);
337 metaChain.Add(mMetaInFiles[idxFile].data());
338 }
339
340 assert(metaChain.GetEntries());
341
342 unsigned int firstTForbit = 0;
343 unsigned int tfCounter = 0;
344 unsigned int tree_entry = 0;
345 unsigned int file = 0;
346 metaChain.SetBranchAddress("firstTForbit", &firstTForbit);
347 metaChain.SetBranchAddress("tfCounter", &tfCounter);
348 metaChain.SetBranchAddress("tree_entry", &tree_entry);
349 metaChain.SetBranchAddress("file", &file);
350
351 LOGP(info, "Filling indices from local meta files");
352 mIndices.clear();
353 mIndices.reserve(metaChain.GetEntries());
354
355 uint32_t countTFs = (mLastTF != -1) ? 0 : metaChain.GetEntries();
356
357 // storage of relevant indices per file
358 std::unordered_map<int, std::vector<std::tuple<unsigned long, int, int>>> indices_per_file;
359
360 // in case processing doesnt start from the first TF, the first TF to be processed needs to be defined
361 for (unsigned long i = 0; i < metaChain.GetEntries(); i++) {
362 metaChain.GetEntry(i);
363 // in case indices are loaded from the meta file store the file index and ttree index
364 mIndices.emplace_back(std::make_tuple(firstTForbit, file, tree_entry));
365 if ((mLastTF != -1) && (tfCounter <= mLastTF)) {
366 if ((tfCounter >= mFirstTF)) {
367 ++countTFs;
368 indices_per_file[file].emplace_back(firstTForbit, file, tree_entry);
369 } else {
370 ++mChainEntry;
371 }
372 }
373 }
374 std::sort(mIndices.begin(), mIndices.end());
375 mLastTF = countTFs;
376
377 if (!indices_per_file.empty()) {
378 mChainEntry = mIndices.size();
379 int counterFile = 0;
380 for (auto& indices : indices_per_file) {
381 if ((mLane + counterFile++) % mNLanes) {
382 continue;
383 }
384 auto& vIndices = indices.second;
385 std::sort(vIndices.begin(), vIndices.end());
386 const int firstIdx = mIndices.size();
387 const int maxSize = vIndices.size() * mNLanes + firstIdx;
388 mIndices.resize(maxSize); // extend vector if necessary and set extended values to -1
389 LOGP(info, "Adding file {} to indices by increasing max size to {}", counterFile - 1, maxSize);
390 for (int i = 0; i < vIndices.size(); ++i) {
391 mIndices[firstIdx + i * mNLanes] = vIndices[i];
392 }
393 }
394 }
395}
396
398{
399 std::vector<OutputSpec> outputs;
400 outputs.emplace_back(o2::header::gDataOriginTPC, getDataDescriptionTPCC(), 0, Lifetime::Sporadic);
401
402 return DataProcessorSpec{
403 "tpc-integrated-cluster-reader",
404 Inputs{},
405 outputs,
406 AlgorithmSpec{adaptFromTask<IntegratedClusterReader>()},
407 Options{
408 {"tpc-currents-infiles", VariantType::String, "o2currents_tpc.root", {"comma-separated list of input files or .txt file containing list of input files"}},
409 {"input-dir", VariantType::String, "none", {"Input directory"}},
410 {"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}},
411 {"firstTF", VariantType::Int, 0, {"First TF to process"}},
412 {"lastTF", VariantType::Int, -1, {"Last TF to process"}},
413 {"input-meta-files", VariantType::String, "", {"Input directory for meta data (TTree containing firstTForbit and tfCounter)"}},
414 {"output-meta-dir", VariantType::String, "none", {"Output directory (TTree containing firstTForbit and tfCounter)"}},
415 }};
416}
417
418} // namespace tpc
419} // namespace o2
int32_t i
calibrator class for accumulating integrated clusters
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
void snapshot(const Output &spec, T const &object)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
~IntegratedClusterReader() override=default
GLuint entry
Definition glcorearb.h:5735
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLuint start
Definition glcorearb.h:469
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
o2::framework::DataProcessorSpec getTPCIntegrateClusterReaderSpec()
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))