15#include <boost/algorithm/string/predicate.hpp>
30#include "TChainElement.h"
50 void fillIndicesFromMeta();
60 std::unique_ptr<TChain> mChain;
61 std::vector<std::string> mFileNames;
62 ITPCC mTPCC, *mTPCCPtr = &mTPCC;
64 std::vector<std::tuple<unsigned long, int, int>> mIndices;
65 std::string mMetaOutFileDir{};
66 std::vector<std::string> mMetaInFiles{};
67 std::unique_ptr<TFile> mTFile;
68 TTree* mTree =
nullptr;
74 mMetaOutFileDir = ic.
options().
get<std::string>(
"output-meta-dir");
75 if (mMetaOutFileDir !=
"none") {
76 LOGP(info,
"Setting up meta output directory to {}", mMetaOutFileDir);
80 mMetaInFiles = o2::RangeTokenizer::tokenize<std::string>(ic.
options().
get<std::string>(
"input-meta-files"));
81 if (!mMetaInFiles.empty()) {
83 if (mMetaOutFileDir !=
"none") {
84 LOGP(error,
"Only input-meta-files or output-meta-dir should be set. Setting input-meta-files to none");
87 LOGP(info,
"Setting up meta input files");
88 if (mMetaInFiles.size() == 1) {
89 if (boost::algorithm::ends_with(mMetaInFiles.front(),
"txt")) {
90 LOGP(info,
"Reading meta files from input file {}", mMetaInFiles.front());
91 std::ifstream is(mMetaInFiles.front());
92 std::istream_iterator<std::string>
start(is);
93 std::istream_iterator<std::string>
end;
94 std::vector<std::string> fileNamesTmp(
start,
end);
95 mMetaInFiles = fileNamesTmp;
107 const auto dontCheckFileAccess = ic.
options().
get<
bool>(
"dont-check-file-access");
108 auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.
options().
get<std::string>(
"tpc-currents-infiles"));
111 if (fileList.size() == 1) {
112 if (boost::algorithm::ends_with(fileList.front(),
"txt")) {
113 LOGP(info,
"Reading files from input file {}", fileList.front());
114 std::ifstream is(fileList.front());
115 std::istream_iterator<std::string>
start(is);
116 std::istream_iterator<std::string>
end;
117 std::vector<std::string> fileNamesTmp(
start,
end);
118 fileList = fileNamesTmp;
123 for (
const auto&
file : fileList) {
124 if ((
file.find(
"alien://") == 0) && !gGrid && !TGrid::Connect(
"alien://")) {
125 LOG(fatal) <<
"Failed to open alien connection";
128 if (!dontCheckFileAccess) {
129 std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data()));
130 if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) {
131 LOGP(warning,
"Could not open file {}", fileDir);
135 mFileNames.emplace_back(fileDir);
138 if (mFileNames.size() == 0) {
139 LOGP(error,
"No input files to process");
147 if (mChainEntry == mLane) {
149 if (mMetaOutFileDir !=
"none") {
157 if (!mMetaInFiles.empty()) {
158 LOGP(info,
"Reading in meta data from files...");
159 fillIndicesFromMeta();
163 LOGP(info,
"Processing {} TFs out ouf {} TFs with first index in chain {}", mLastTF, mIndices.size(), mChainEntry);
173 if (mMetaInFiles.empty()) {
174 const int entry = std::get<1>(mIndices[mChainEntry]);
175 LOGP(info,
"Processing entry {}",
entry);
176 mChain->GetEntry(
entry);
178 const int iFile = std::get<1>(mIndices[mChainEntry]);
179 const int treeEntry = std::get<2>(mIndices[mChainEntry]);
180 LOGP(info,
"Processing file {} with TTree index {}", iFile, treeEntry);
181 TChainElement* chainEle = (TChainElement*)(mChain->GetListOfFiles()->At(iFile));
184 if (mTFile && (std::string(mTFile->GetName()) == std::string(chainEle->GetTitle()))) {
185 LOGP(info,
"File is same. Do not need to reload...");
188 mTFile = std::unique_ptr<TFile>{TFile::Open(chainEle->GetTitle())};
193 LOGP(warning,
"File {} is nullptr", chainEle->GetTitle());
198 LOGP(info,
"Setting up new TTree");
199 mTFile->GetObject(
"itpcc", mTree);
201 LOGP(warning,
"Tree for file {} is nullptr", chainEle->GetTitle());
205 mTree->SetBranchAddress(
"ITPCC", &mTPCCPtr);
206 mTree->SetBranchAddress(
"tfID", &mTFinfoPtr);
208 mTree->GetEntry(treeEntry);
210 mChainEntry += mNLanes;
215 timingInfo.tfCounter = mTFinfo.
tfCounter;
216 timingInfo.runNumber = mTFinfo.
runNumber;
217 timingInfo.creation = mTFinfo.
creation;
219 LOGP(info,
"Processed data for firstTForbit {} and tfCounter {}", timingInfo.firstTForbit, timingInfo.tfCounter);
224void IntegratedClusterReader::connectTrees()
226 mChain.reset(
new TChain(
"itpcc"));
227 for (
const auto&
file : mFileNames) {
228 LOGP(info,
"Adding file to chain: {}",
file);
229 mChain->AddFile(
file.data());
231 assert(mChain->GetEntries());
232 mChain->SetBranchAddress(
"ITPCC", &mTPCCPtr);
233 mChain->SetBranchAddress(
"tfID", &mTFinfoPtr);
238 const std::string outFileMeta = fmt::format(
"{}/tpc_meta_{}.root", mMetaOutFileDir, mLane);
239 LOGP(info,
"Producing meta file to: {}", outFileMeta);
241 const int nFilesInChain = mChain->GetListOfFiles()->GetEntries();
242 const int nFilesPerLane = nFilesInChain / mNLanes;
243 const int firstFile = mLane * nFilesPerLane;
244 const int lastFile = (mLane == mNLanes - 1) ? nFilesInChain : (firstFile + nFilesPerLane);
245 LOGP(info,
"Processing files {} to {}", firstFile, lastFile - 1);
248 for (
unsigned int iFile = firstFile; iFile < lastFile; ++iFile) {
249 TChainElement* chainEle = (TChainElement*)(mChain->GetListOfFiles()->At(iFile));
250 auto file = std::unique_ptr<TFile>{TFile::Open(chainEle->GetTitle())};
252 LOGP(warning,
"File {} is nullptr", chainEle->GetTitle());
255 TTree*
tree =
nullptr;
258 LOGP(warning,
"Tree for file {} is nullptr", chainEle->GetTitle());
263 tree->SetBranchAddress(
"tfID.firstTForbit", &firstTForbit);
264 tree->SetBranchAddress(
"tfID.tfCounter", &tfCounter);
265 tree->SetBranchStatus(
"*", 0);
266 tree->SetBranchStatus(
"firstTForbit", 1);
267 tree->SetBranchStatus(
"tfCounter", 1);
268 const int entries =
tree->GetEntries();
269 for (
unsigned int i = 0;
i < entries; ++
i) {
274 <<
"tree_entry=" <<
i
280 LOGP(info,
"Created meta data for input files {} to {} for a total of {} files", firstFile, lastFile - 1, nFilesInChain);
283void IntegratedClusterReader::fillIndices()
285 LOGP(info,
"Filling indices from input files");
287 mIndices.reserve(mChain->GetEntries());
289 mChain->SetBranchStatus(
"*", 0);
290 mChain->SetBranchStatus(
"firstTForbit", 1);
291 uint32_t countTFs = mChain->GetEntries();
295 mChain->SetBranchStatus(
"tfCounter", 1);
297 for (
unsigned long i = 0;
i < mChain->GetEntries();
i++) {
300 mIndices.emplace_back(std::make_tuple(mTFinfo.
firstTForbit,
i, -1));
302 if ((mLastTF != -1) && (mTFinfo.
tfCounter <= mLastTF)) {
312 mChain->SetBranchStatus(
"*", 1);
313 std::sort(mIndices.begin(), mIndices.end());
317void IntegratedClusterReader::fillIndicesFromMeta()
319 std::vector<int> indicesMeta(mMetaInFiles.size());
320 for (
int ifile = 0; ifile < mMetaInFiles.size(); ++ifile) {
321 auto file = std::unique_ptr<TFile>{TFile::Open(mMetaInFiles[ifile].
data())};
322 TTree*
tree =
nullptr;
325 LOGP(warning,
"Tree for file {} is nullptr", mMetaInFiles[ifile]);
329 tree->SetBranchAddress(
"lane", &fileIdx);
331 LOGP(info,
"Index of file {} is {}", ifile, fileIdx);
332 indicesMeta[fileIdx] = ifile;
334 TChain metaChain(
"meta");
335 for (
const auto idxFile : indicesMeta) {
336 LOGP(info,
"Adding to meta chain: {}", mMetaInFiles[idxFile]);
337 metaChain.Add(mMetaInFiles[idxFile].
data());
340 assert(metaChain.GetEntries());
344 unsigned int tree_entry = 0;
345 unsigned int file = 0;
346 metaChain.SetBranchAddress(
"firstTForbit", &firstTForbit);
347 metaChain.SetBranchAddress(
"tfCounter", &tfCounter);
348 metaChain.SetBranchAddress(
"tree_entry", &tree_entry);
349 metaChain.SetBranchAddress(
"file", &
file);
351 LOGP(info,
"Filling indices from local meta files");
353 mIndices.reserve(metaChain.GetEntries());
355 uint32_t countTFs = (mLastTF != -1) ? 0 : metaChain.GetEntries();
358 std::unordered_map<int, std::vector<std::tuple<unsigned long, int, int>>> indices_per_file;
361 for (
unsigned long i = 0;
i < metaChain.GetEntries();
i++) {
362 metaChain.GetEntry(
i);
364 mIndices.emplace_back(std::make_tuple(firstTForbit,
file, tree_entry));
365 if ((mLastTF != -1) && (tfCounter <= mLastTF)) {
366 if ((tfCounter >= mFirstTF)) {
368 indices_per_file[
file].emplace_back(firstTForbit,
file, tree_entry);
374 std::sort(mIndices.begin(), mIndices.end());
377 if (!indices_per_file.empty()) {
378 mChainEntry = mIndices.size();
380 for (
auto&
indices : indices_per_file) {
381 if ((mLane + counterFile++) % mNLanes) {
384 auto& vIndices =
indices.second;
385 std::sort(vIndices.begin(), vIndices.end());
386 const int firstIdx = mIndices.size();
387 const int maxSize = vIndices.size() * mNLanes + firstIdx;
388 mIndices.resize(maxSize);
389 LOGP(info,
"Adding file {} to indices by increasing max size to {}", counterFile - 1, maxSize);
390 for (
int i = 0;
i < vIndices.size(); ++
i) {
391 mIndices[firstIdx +
i * mNLanes] = vIndices[
i];
399 std::vector<OutputSpec> outputs;
403 "tpc-integrated-cluster-reader",
408 {
"tpc-currents-infiles", VariantType::String,
"o2currents_tpc.root", {
"comma-separated list of input files or .txt file containing list of input files"}},
409 {
"input-dir", VariantType::String,
"none", {
"Input directory"}},
410 {
"dont-check-file-access", VariantType::Bool,
false, {
"Deactivate check if all files are accessible before adding them to the list of files"}},
411 {
"firstTF", VariantType::Int, 0, {
"First TF to process"}},
412 {
"lastTF", VariantType::Int, -1, {
"Last TF to process"}},
413 {
"input-meta-files", VariantType::String,
"", {
"Input directory for meta data (TTree containing firstTForbit and tfCounter)"}},
414 {
"output-meta-dir", VariantType::String,
"none", {
"Output directory (TTree containing firstTForbit and tfCounter)"}},
calibrator class for accumulating integrated clusters
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
IntegratedClusterReader()=default
void init(InitContext &ic) final
void run(ProcessingContext &pc) final
~IntegratedClusterReader() override=default
GLsizei GLenum const void * indices
constexpr o2::header::DataOrigin gDataOriginTPC
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
o2::framework::DataProcessorSpec getTPCIntegrateClusterReaderSpec()
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
uint32_t tfCounter
the orbit the TF begins
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))