Project
Loading...
Searching...
No Matches
FITIntegrateClusterReaderSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_FIT_FITINTEGRATECLUSTERREADER_SPEC
13#define O2_FIT_FITINTEGRATECLUSTERREADER_SPEC
14
15#include <vector>
16#include <boost/algorithm/string/predicate.hpp>
17
20#include "Framework/Task.h"
26#include "TChain.h"
27#include "TGrid.h"
28
29using namespace o2::framework;
30
31namespace o2
32{
33namespace fit
34{
35
36template <typename DataT>
38{
39 public:
41 ~IntegratedClusterReader() override = default;
42 void init(InitContext& ic) final;
43 void run(ProcessingContext& pc) final;
44
45 private:
46 void connectTrees();
47
48 int mChainEntry = 0;
49 std::unique_ptr<TChain> mChain;
50 std::vector<std::string> mFileNames;
51 typename DataDescriptionFITCurrents<DataT>::DataTStruct mFITC, *mFITCPtr = &mFITC;
52 o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo;
53 std::vector<std::pair<unsigned long, int>> mIndices;
54};
55
56template <typename DataT>
58{
59 const auto dontCheckFileAccess = ic.options().get<bool>("dont-check-file-access");
60 auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>(fmt::format("{}-currents-infiles", DataDescriptionFITCurrents<DataT>::getName()).data()));
61
62 // check if only one input file (a txt file contaning a list of files is provided)
63 if (fileList.size() == 1) {
64 if (boost::algorithm::ends_with(fileList.front(), "txt")) {
65 LOGP(info, "Reading files from input file {}", fileList.front());
66 std::ifstream is(fileList.front());
67 std::istream_iterator<std::string> start(is);
68 std::istream_iterator<std::string> end;
69 std::vector<std::string> fileNamesTmp(start, end);
70 fileList = fileNamesTmp;
71 }
72 }
73
74 const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir"));
75 for (const auto& file : fileList) {
76 if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) {
77 LOG(fatal) << "Failed to open alien connection";
78 }
79 const auto fileDir = o2::utils::Str::concat_string(inpDir, file);
80 if (!dontCheckFileAccess) {
81 std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data()));
82 if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) {
83 LOGP(warning, "Could not open file {}", fileDir);
84 continue;
85 }
86 }
87 mFileNames.emplace_back(fileDir);
88 }
89
90 if (mFileNames.size() == 0) {
91 LOGP(error, "No input files to process");
92 }
93 connectTrees();
94}
95
96template <typename DataT>
98{
99 // check time order inside the TChain
100 if (mChainEntry == 0) {
101 mIndices.clear();
102 mIndices.reserve(mChain->GetEntries());
103 // disable all branches except the firstTForbit branch to significantly speed up the loop over the TTree
104 mChain->SetBranchStatus("*", 0);
105 mChain->SetBranchStatus("firstTForbit", 1);
106 for (unsigned long i = 0; i < mChain->GetEntries(); i++) {
107 mChain->GetEntry(i);
108 mIndices.emplace_back(std::make_pair(mTFinfo.firstTForbit, i));
109 }
110 mChain->SetBranchStatus("*", 1);
111 std::sort(mIndices.begin(), mIndices.end());
112 }
113
114 LOGP(debug, "Processing entry {}", mIndices[mChainEntry].second);
115 mChain->GetEntry(mIndices[mChainEntry++].second);
116
117 // inject correct timing informations
118 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
119 timingInfo.firstTForbit = mTFinfo.firstTForbit;
120 timingInfo.tfCounter = mTFinfo.tfCounter;
121 timingInfo.runNumber = mTFinfo.runNumber;
122 timingInfo.creation = mTFinfo.creation;
123
124 using FitType = DataDescriptionFITCurrents<DataT>;
125 pc.outputs().snapshot(Output{FitType::getDataOrigin(), FitType::getDataDescriptionFITC()}, mFITC);
126 usleep(100);
127
128 if (mChainEntry >= mChain->GetEntries()) {
129 pc.services().get<ControlService>().endOfStream();
130 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
131 }
132}
133
134template <typename DataT>
136{
137 const std::string treeName = DataDescriptionFITCurrents<DataT>::getName();
138 mChain.reset(new TChain(treeName.data()));
139 for (const auto& file : mFileNames) {
140 LOGP(info, "Adding file to chain: {}", file);
141 mChain->AddFile(file.data());
142 }
143 assert(mChain->GetEntries());
144 mChain->SetBranchAddress("IFITC", &mFITCPtr);
145 mChain->SetBranchAddress("tfID", &mTFinfoPtr);
146}
147
148template <typename DataT>
150{
151 using FitType = DataDescriptionFITCurrents<DataT>;
152 std::vector<OutputSpec> outputs;
153 outputs.emplace_back(FitType::getDataOrigin(), FitType::getDataDescriptionFITC(), 0, Lifetime::Sporadic);
154
155 return DataProcessorSpec{
156 fmt::format("{}-integrated-cluster-reader", FitType::getName()),
157 Inputs{},
158 outputs,
159 AlgorithmSpec{adaptFromTask<IntegratedClusterReader<DataT>>()},
160 Options{
161 {fmt::format("{}-currents-infiles", FitType::getName()), VariantType::String, fmt::format("o2currents_{}.root", FitType::getName()), {"comma-separated list of input files or .txt file containing list of input files"}},
162 {"input-dir", VariantType::String, "none", {"Input directory"}},
163 {"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}},
164 }};
165}
166
167} // end namespace fit
168} // end namespace o2
169
170#endif
int32_t i
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
~IntegratedClusterReader() override=default
void run(ProcessingContext &pc) final
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint GLuint end
Definition glcorearb.h:469
GLuint start
Definition glcorearb.h:469
DataProcessorSpec getFITIntegrateClusterReaderSpec()
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
TFitResultPtr fit(const size_t nBins, const T *arr, const T xMin, const T xMax, TF1 &func, std::string_view option="")
Definition fit.h:59
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"