Project
Loading...
Searching...
No Matches
TOFIntegrateClusterReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <vector>
15#include <boost/algorithm/string/predicate.hpp>
16
18#include "Framework/Task.h"
24#include "TChain.h"
25#include "TGrid.h"
26
27using namespace o2::framework;
28
29namespace o2
30{
31namespace tof
32{
33
35{
36 public:
38 ~IntegratedClusterReader() override = default;
39 void init(InitContext& ic) final;
40 void run(ProcessingContext& pc) final;
41
42 private:
43 void connectTrees();
44
45 int mChainEntry = 0;
46 std::unique_ptr<TChain> mChain;
47 std::vector<std::string> mFileNames;
48 std::vector<float> mTOFCNCl, *mTOFCNClPtr = &mTOFCNCl;
49 std::vector<float> mTOFCqTot, *mTOFCqTotPtr = &mTOFCqTot;
50 o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo;
51 std::vector<std::pair<unsigned long, int>> mIndices;
52};
53
55{
56 const auto dontCheckFileAccess = ic.options().get<bool>("dont-check-file-access");
57 auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>("tof-currents-infiles"));
58
59 // check if only one input file (a txt file contaning a list of files is provided)
60 if (fileList.size() == 1) {
61 if (boost::algorithm::ends_with(fileList.front(), "txt")) {
62 LOGP(info, "Reading files from input file {}", fileList.front());
63 std::ifstream is(fileList.front());
64 std::istream_iterator<std::string> start(is);
65 std::istream_iterator<std::string> end;
66 std::vector<std::string> fileNamesTmp(start, end);
67 fileList = fileNamesTmp;
68 }
69 }
70
71 const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir"));
72 for (const auto& file : fileList) {
73 if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) {
74 LOG(fatal) << "Failed to open alien connection";
75 }
76 const auto fileDir = o2::utils::Str::concat_string(inpDir, file);
77 if (!dontCheckFileAccess) {
78 std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data()));
79 if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) {
80 LOGP(warning, "Could not open file {}", fileDir);
81 continue;
82 }
83 }
84 mFileNames.emplace_back(fileDir);
85 }
86
87 if (mFileNames.size() == 0) {
88 LOGP(error, "No input files to process");
89 }
90 connectTrees();
91}
92
94{
95 // check time order inside the TChain
96 if (mChainEntry == 0) {
97 mIndices.clear();
98 mIndices.reserve(mChain->GetEntries());
99 // disable all branches except the firstTForbit branch to significantly speed up the loop over the TTree
100 mChain->SetBranchStatus("*", 0);
101 mChain->SetBranchStatus("firstTForbit", 1);
102 for (unsigned long i = 0; i < mChain->GetEntries(); i++) {
103 mChain->GetEntry(i);
104 mIndices.emplace_back(std::make_pair(mTFinfo.firstTForbit, i));
105 }
106 mChain->SetBranchStatus("*", 1);
107 std::sort(mIndices.begin(), mIndices.end());
108 }
109
110 LOGP(debug, "Processing entry {}", mIndices[mChainEntry].second);
111 mChain->GetEntry(mIndices[mChainEntry++].second);
112
113 // inject correct timing informations
114 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
115 timingInfo.firstTForbit = mTFinfo.firstTForbit;
116 timingInfo.tfCounter = mTFinfo.tfCounter;
117 timingInfo.runNumber = mTFinfo.runNumber;
118 timingInfo.creation = mTFinfo.creation;
119
120 pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCN"}, mTOFCNCl);
121 pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCQ"}, mTOFCqTot);
122 usleep(100);
123
124 if (mChainEntry >= mChain->GetEntries()) {
126 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
127 }
128}
129
130void IntegratedClusterReader::connectTrees()
131{
132 mChain.reset(new TChain("itofc"));
133 for (const auto& file : mFileNames) {
134 LOGP(info, "Adding file to chain: {}", file);
135 mChain->AddFile(file.data());
136 }
137 assert(mChain->GetEntries());
138 mChain->SetBranchAddress("ITOFCN", &mTOFCNClPtr);
139 mChain->SetBranchAddress("ITOFCQ", &mTOFCqTotPtr);
140 mChain->SetBranchAddress("tfID", &mTFinfoPtr);
141}
142
144{
145 std::vector<OutputSpec> outputs;
146 outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic);
147 outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic);
148
149 return DataProcessorSpec{
150 "tof-integrated-cluster-reader",
151 Inputs{},
152 outputs,
153 AlgorithmSpec{adaptFromTask<IntegratedClusterReader>()},
154 Options{
155 {"tof-currents-infiles", VariantType::String, "o2currents_tof.root", {"comma-separated list of input files or .txt file containing list of input files"}},
156 {"input-dir", VariantType::String, "none", {"Input directory"}},
157 {"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}},
158 }};
159}
160
161} // namespace tof
162} // namespace o2
int32_t i
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
~IntegratedClusterReader() override=default
GLuint GLuint end
Definition glcorearb.h:469
GLuint start
Definition glcorearb.h:469
constexpr o2::header::DataOrigin gDataOriginTOF
Definition DataHeader.h:575
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
o2::framework::DataProcessorSpec getTOFIntegrateClusterReaderSpec()
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"