Project
Loading...
Searching...
No Matches
TPCResidualAggregatorSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_TPC_RESIDUALAGGREGATORSPEC_H
13#define O2_TPC_RESIDUALAGGREGATORSPEC_H
14
18
24#include "Framework/Task.h"
30#include <fairmq/Device.h>
31#include <chrono>
32
33using namespace o2::framework;
35
36namespace o2
37{
38namespace calibration
39{
40
42{
43 public:
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
45
47 {
49 int minEnt = ic.options().get<int>("min-entries");
50 auto slotLength = ic.options().get<uint32_t>("sec-per-slot");
51 bool useInfiniteSlotLength = false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength = true;
54 }
55 auto updateInterval = ic.options().get<uint32_t>("updateInterval");
56 auto delay = ic.options().get<uint32_t>("max-delay");
57
58 // should we write meta files for epn2eos?
59 bool storeMetaFile = false;
60 std::string metaFileDir = ic.options().get<std::string>("meta-output-dir");
61 if (metaFileDir != "/dev/null") {
62 metaFileDir = o2::utils::Str::rectifyDirectory(metaFileDir);
63 storeMetaFile = true;
64 }
65
66 // where should the ROOT output be written? in case its set to /dev/null
67 // we don't write anything, also no meta files of course
68 bool writeOutput = true;
69 std::string outputDir = ic.options().get<std::string>("output-dir");
70 if (outputDir != "/dev/null") {
71 outputDir = o2::utils::Str::rectifyDirectory(outputDir);
72 } else {
73 writeOutput = false;
74 storeMetaFile = false;
75 }
76
77 LOGP(info, "Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
80 if (writeOutput) {
81 mAggregator->setOutputDir(outputDir);
82 }
83 if (storeMetaFile) {
84 mAggregator->setMetaFileOutputDir(metaFileDir);
85 }
86
87 int autosave = ic.options().get<int>("autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
89 // TODO mAggregator should get an option to set the binning externally (expose TrackResiduals::setBinning methods to user? as command line option?)
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
92 mAggregator->setSlotLength(o2::calibration::INFINITE_TF);
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
94 } else {
95 mAggregator->setSlotLengthInSeconds(slotLength);
96 }
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<int>("compression"));
101 }
102
103 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
104 {
106 return;
107 }
108 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
109 return;
110 }
111 }
112
114 {
115 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
116 if (tinfo.globalRunNumberChanged) {
117 // new run is starting
118 LOG(info) << "New run start detected";
119 mRunStopRequested = false;
120 mInitDone = false;
121 }
122 if (mRunStopRequested) {
123 return;
124 }
125 auto runStartTime = std::chrono::high_resolution_clock::now();
127 recoCont.collectData(pc, *mDataRequest);
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
130
131 // we always require the unbinned residuals and the associated track references
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>("unbinnedRes");
133 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>("trackRefs");
134
135 // track data input is optional
136 const gsl::span<const o2::tpc::TrackData>* trkDataPtr = nullptr;
137 using trkDataType = std::decay_t<decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(""))>;
138 std::optional<trkDataType> trkData;
139 if (mTrackInput) {
140 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>("trkData"));
141 trkDataPtr = &trkData.value();
142 }
143 // CTP lumi input (optional)
144 const o2::ctp::LumiInfo* lumi = nullptr;
145 using lumiDataType = std::decay_t<decltype(pc.inputs().get<o2::ctp::LumiInfo>(""))>;
146 std::optional<lumiDataType> lumiInput;
147 if (mCTPInput) {
148 recoCont.getCTPLumi();
149 lumiInput = recoCont.getCTPLumi();
150 lumi = &lumiInput.value();
151 }
152
153 o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mAggregator->getCurrentTFInfo());
154 LOG(detail) << "Processing TF " << mAggregator->getCurrentTFInfo().tfCounter << " with " << trkData->size() << " tracks and " << residualsData.size() << " unbinned residuals associated to them";
155 mAggregator->process(residualsData, trackRefs, trkDataPtr, lumi);
156 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
157 LOGP(debug, "Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
158 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).count(),
159 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).count());
160 if (pc.transitionState() == TransitionHandlingState::Requested) {
161 LOG(info) << "Run stop requested, finalizing";
162 mRunStopRequested = true;
163 mAggregator->checkSlotsToFinalize();
164 mAggregator.reset();
165 LOG(info) << "Finalizing done";
166 }
167 }
168
170 {
171 if (mRunStopRequested) {
172 return;
173 }
174 LOG(info) << "Finalizing calibration for end of stream";
175 mAggregator->checkSlotsToFinalize();
176 mAggregator.reset(); // must invoke destructor manually here, otherwise we get a segfault
177 LOG(info) << "Finalizing done for end of stream";
178 }
179
180 private:
181 void updateTimeDependentParams(ProcessingContext& pc)
182 {
184 mTPCVDriftHelper.extractCCDBInputs(pc);
185 if (mTPCVDriftHelper.isUpdated()) {
186 LOGP(info, "Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
187 mTPCVDriftHelper.getVDriftObject().corrFact, mTPCVDriftHelper.getVDriftObject().refVDrift,
188 mTPCVDriftHelper.getVDriftObject().timeOffsetCorr, mTPCVDriftHelper.getVDriftObject().refTimeOffset,
189 mTPCVDriftHelper.getSourceName());
190 mAggregator->setTPCVDrift(mTPCVDriftHelper.getVDriftObject());
191 mTPCVDriftHelper.acknowledgeUpdate();
192 }
193 if (!mInitDone) {
194 mAggregator->setDataTakingContext(pc.services().get<DataTakingContext>());
195 mAggregator->setOrbitResetTime(o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS());
196 mInitDone = true;
197 }
198 }
199 o2::tpc::VDriftHelper mTPCVDriftHelper{};
200 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
201 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
202 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
203 bool mTrackInput{false};
204 bool mCTPInput{false};
205 bool mWriteBinnedResiduals{false};
206 bool mWriteUnbinnedResiduals{false};
207 bool mWriteTrackData{false};
208 bool mRunStopRequested{false};
209 bool mInitDone{false};
210};
211
212} // namespace calibration
213
214namespace framework
215{
216
217DataProcessorSpec getTPCResidualAggregatorSpec(bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData)
218{
219 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
220 if (ctpInput) {
221 dataRequest->requestClusters(GID::getSourcesMask("CTP"), false);
222 }
223 auto& inputs = dataRequest->inputs;
225 inputs.emplace_back("unbinnedRes", "GLO", "UNBINNEDRES");
226 inputs.emplace_back("trackRefs", "GLO", "TRKREFS");
227 if (trackInput) {
228 inputs.emplace_back("trkData", "GLO", "TRKDATA");
229 }
230 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
231 true, // GRPECS=true
232 false, // GRPLHCIF
233 false, // GRPMagField
234 false, // askMatLUT
236 inputs,
237 true);
238 return DataProcessorSpec{
239 "residual-aggregator",
240 inputs,
241 Outputs{},
242 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
243 Options{
244 {"sec-per-slot", VariantType::UInt32, 600u, {"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
245 {"updateInterval", VariantType::UInt32, 6'000u, {"update interval in number of TFs (only used in case slot length is infinite)"}},
246 {"max-delay", VariantType::UInt32, 1u, {"number of slots in past to consider"}},
247 {"min-entries", VariantType::Int, 0, {"minimum number of entries on average per voxel"}},
248 {"compression", VariantType::Int, 505, {"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
249 {"output-dir", VariantType::String, "none", {"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
250 {"meta-output-dir", VariantType::String, "/dev/null", {"Residuals metadata output directory, must exist (if not /dev/null)"}},
251 {"autosave-interval", VariantType::Int, 0, {"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
252}
253
254} // namespace framework
255} // namespace o2
256
257#endif // O2_TPC_RESIDUALAGGREGATORSPEC_H
Utils and constants for calibration and related workflows.
Helper for geometry and GRP related CCDB requests.
Collects local TPC cluster residuals from EPNs.
Definition of the TrackResiduals class.
Helper class to extract VDrift from different sources.
std::ostringstream debug
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
ResidualAggregatorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr< o2::globaltracking::DataRequest > dataRequest)
static mask_t getSourcesMask(const std::string_view srcList)
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
static std::string_view getSourceName(Source s)
bool isUpdated() const
GLint GLsizei count
Definition glcorearb.h:399
constexpr TFType INFINITE_TF
Definition TimeSlot.h:30
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
const o2::ctp::LumiInfo & getCTPLumi() const
float refTimeOffset
additive time offset reference (\mus)
float refVDrift
reference vdrift for which factor was extracted
float timeOffsetCorr
additive time offset correction (\mus)
float corrFact
drift velocity correction factor (multiplicative)
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
LumiInfo lumi