Project
Loading...
Searching...
No Matches
TPCResidualAggregatorSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_TPC_RESIDUALAGGREGATORSPEC_H
13#define O2_TPC_RESIDUALAGGREGATORSPEC_H
14
18
24#include "Framework/Task.h"
30#include <fairmq/Device.h>
31#include <chrono>
32
33using namespace o2::framework;
35
36namespace o2
37{
38namespace calibration
39{
40
42{
43 public:
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
45
47 {
49 int minEnt = ic.options().get<int>("min-entries");
50 auto slotLength = ic.options().get<uint32_t>("sec-per-slot");
51 bool useInfiniteSlotLength = false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength = true;
54 }
55 auto updateInterval = ic.options().get<uint32_t>("updateInterval");
56 auto delay = ic.options().get<uint32_t>("max-delay");
57
58 // should we write meta files for epn2eos?
59 bool storeMetaFile = false;
60 std::string metaFileDir = ic.options().get<std::string>("meta-output-dir");
61 if (metaFileDir != "/dev/null") {
62 metaFileDir = o2::utils::Str::rectifyDirectory(metaFileDir);
63 storeMetaFile = true;
64 }
65
66 // where should the ROOT output be written? in case its set to /dev/null
67 // we don't write anything, also no meta files of course
68 bool writeOutput = true;
69 std::string outputDir = ic.options().get<std::string>("output-dir");
70 if (outputDir != "/dev/null") {
71 outputDir = o2::utils::Str::rectifyDirectory(outputDir);
72 } else {
73 writeOutput = false;
74 storeMetaFile = false;
75 }
76
77 LOGP(info, "Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
80 if (writeOutput) {
81 mAggregator->setOutputDir(outputDir);
82 }
83 if (storeMetaFile) {
84 mAggregator->setMetaFileOutputDir(metaFileDir);
85 }
86
87 int autosave = ic.options().get<int>("autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
89 // TODO mAggregator should get an option to set the binning externally (expose TrackResiduals::setBinning methods to user? as command line option?)
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
92 mAggregator->setSlotLength(o2::calibration::INFINITE_TF);
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
94 } else {
95 mAggregator->setSlotLengthInSeconds(slotLength);
96 }
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<int>("compression"));
101 }
102
103 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
104 {
106 return;
107 }
108 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
109 return;
110 }
111 }
112
114 {
115 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
116 if (tinfo.globalRunNumberChanged) {
117 // new run is starting
118 LOG(info) << "New run start detected";
119 mRunStopRequested = false;
120 mInitDone = false;
121 }
122 if (mRunStopRequested) {
123 return;
124 }
125 auto runStartTime = std::chrono::high_resolution_clock::now();
127 recoCont.collectData(pc, *mDataRequest);
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
130
131 // we always require the unbinned residuals and the associated detector info and track references
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>("unbinnedRes");
133 auto residualsDataDet = pc.inputs().get<gsl::span<o2::tpc::DetInfoResid>>("detinfoRes");
134 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>("trackRefs");
135
136 // track data input is optional
137 const gsl::span<const o2::tpc::TrackData>* trkDataPtr = nullptr;
138 using trkDataType = std::decay_t<decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(""))>;
139 std::optional<trkDataType> trkData;
140 if (mTrackInput) {
141 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>("trkData"));
142 trkDataPtr = &trkData.value();
143 }
144 // CTP lumi input (optional)
145 const o2::ctp::LumiInfo* lumi = nullptr;
146 using lumiDataType = std::decay_t<decltype(pc.inputs().get<o2::ctp::LumiInfo>(""))>;
147 std::optional<lumiDataType> lumiInput;
148 if (mCTPInput) {
149 lumiInput = recoCont.getCTPLumi();
150 lumi = &lumiInput.value();
151 }
152
153 o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mAggregator->getCurrentTFInfo());
154 LOG(detail) << "Processing TF " << mAggregator->getCurrentTFInfo().tfCounter << " with " << trkData->size() << " tracks and " << residualsData.size() << " unbinned residuals associated to them";
155 mAggregator->process(residualsData, residualsDataDet, trackRefs, trkDataPtr, lumi);
156 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
157 LOGP(debug, "Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
158 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).count(),
159 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).count());
160 if (pc.transitionState() == TransitionHandlingState::Requested) {
161 LOG(info) << "Run stop requested, finalizing";
162 mRunStopRequested = true;
163 mAggregator->checkSlotsToFinalize();
164 mAggregator.reset();
165 LOG(info) << "Finalizing done";
166 }
167 }
168
170 {
171 if (mRunStopRequested) {
172 return;
173 }
174 LOG(info) << "Finalizing calibration for end of stream";
175 mAggregator->checkSlotsToFinalize();
176 mAggregator.reset(); // must invoke destructor manually here, otherwise we get a segfault
177 LOG(info) << "Finalizing done for end of stream";
178 }
179
180 private:
181 void updateTimeDependentParams(ProcessingContext& pc)
182 {
184 mTPCVDriftHelper.extractCCDBInputs(pc);
185 if (mTPCVDriftHelper.isUpdated()) {
186 LOGP(info, "Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
187 mTPCVDriftHelper.getVDriftObject().corrFact, mTPCVDriftHelper.getVDriftObject().refVDrift,
188 mTPCVDriftHelper.getVDriftObject().timeOffsetCorr, mTPCVDriftHelper.getVDriftObject().refTimeOffset,
189 mTPCVDriftHelper.getSourceName());
190 mAggregator->setTPCVDrift(mTPCVDriftHelper.getVDriftObject());
191 mTPCVDriftHelper.acknowledgeUpdate();
192 }
193 if (!mInitDone) {
194 mAggregator->setDataTakingContext(pc.services().get<DataTakingContext>());
195 mAggregator->setOrbitResetTime(o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS());
196 mInitDone = true;
197 }
198 }
199 o2::tpc::VDriftHelper mTPCVDriftHelper{};
200 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
201 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
202 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
203 bool mTrackInput{false};
204 bool mCTPInput{false};
205 bool mWriteBinnedResiduals{false};
206 bool mWriteUnbinnedResiduals{false};
207 bool mWriteTrackData{false};
208 bool mRunStopRequested{false};
209 bool mInitDone{false};
210};
211
212} // namespace calibration
213
214namespace framework
215{
216
217DataProcessorSpec getTPCResidualAggregatorSpec(bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData)
218{
219 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
220 if (ctpInput) {
221 dataRequest->requestClusters(GID::getSourcesMask("CTP"), false);
222 }
223 auto& inputs = dataRequest->inputs;
225 inputs.emplace_back("unbinnedRes", "GLO", "UNBINNEDRES");
226 inputs.emplace_back("detinfoRes", "GLO", "DETINFORES");
227 inputs.emplace_back("trackRefs", "GLO", "TRKREFS");
228 if (trackInput) {
229 inputs.emplace_back("trkData", "GLO", "TRKDATA");
230 }
231 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
232 true, // GRPECS=true
233 false, // GRPLHCIF
234 false, // GRPMagField
235 false, // askMatLUT
237 inputs,
238 true);
239 return DataProcessorSpec{
240 "residual-aggregator",
241 inputs,
242 Outputs{},
243 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
244 Options{
245 {"sec-per-slot", VariantType::UInt32, 600u, {"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
246 {"updateInterval", VariantType::UInt32, 6'000u, {"update interval in number of TFs (only used in case slot length is infinite)"}},
247 {"max-delay", VariantType::UInt32, 1u, {"number of slots in past to consider"}},
248 {"min-entries", VariantType::Int, 0, {"minimum number of entries on average per voxel"}},
249 {"compression", VariantType::Int, 505, {"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
250 {"output-dir", VariantType::String, "none", {"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
251 {"meta-output-dir", VariantType::String, "/dev/null", {"Residuals metadata output directory, must exist (if not /dev/null)"}},
252 {"autosave-interval", VariantType::Int, 0, {"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
253}
254
255} // namespace framework
256} // namespace o2
257
258#endif // O2_TPC_RESIDUALAGGREGATORSPEC_H
Utils and constants for calibration and related workflows.
std::ostringstream debug
Helper for geometry and GRP related CCDB requests.
Collects local TPC cluster residuals from EPNs.
Definition of the TrackResiduals class.
Helper class to extract VDrift from different sources.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
ResidualAggregatorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr< o2::globaltracking::DataRequest > dataRequest)
static mask_t getSourcesMask(const std::string_view srcList)
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
static std::string_view getSourceName(Source s)
bool isUpdated() const
GLint GLsizei count
Definition glcorearb.h:399
constexpr TFType INFINITE_TF
Definition TimeSlot.h:30
Defining PrimaryVertex explicitly as messageable.
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
const o2::ctp::LumiInfo & getCTPLumi() const
float refTimeOffset
additive time offset reference (\mus)
float refVDrift
reference vdrift for which factor was extracted
float timeOffsetCorr
additive time offset correction (\mus)
float corrFact
drift velocity correction factor (multiplicative)
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
LumiInfo lumi