Project
Loading...
Searching...
No Matches
TPCResidualAggregatorSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_TPC_RESIDUALAGGREGATORSPEC_H
13#define O2_TPC_RESIDUALAGGREGATORSPEC_H
14
18
24#include "Framework/Task.h"
30#include <fairmq/Device.h>
31#include <chrono>
32
33using namespace o2::framework;
35
36namespace o2
37{
38namespace calibration
39{
40
42{
43 public:
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
45
47 {
49 int minEnt = ic.options().get<int>("min-entries");
50 auto slotLength = ic.options().get<uint32_t>("sec-per-slot");
51 bool useInfiniteSlotLength = false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength = true;
54 }
55 auto updateInterval = ic.options().get<uint32_t>("updateInterval");
56 auto delay = ic.options().get<uint32_t>("max-delay");
57
58 // should we write meta files for epn2eos?
59 bool storeMetaFile = false;
60 std::string metaFileDir = ic.options().get<std::string>("meta-output-dir");
61 if (metaFileDir != "/dev/null") {
62 metaFileDir = o2::utils::Str::rectifyDirectory(metaFileDir);
63 storeMetaFile = true;
64 }
65
66 // where should the ROOT output be written? in case its set to /dev/null
67 // we don't write anything, also no meta files of course
68 bool writeOutput = true;
69 std::string outputDir = ic.options().get<std::string>("output-dir");
70 if (outputDir != "/dev/null") {
71 outputDir = o2::utils::Str::rectifyDirectory(outputDir);
72 } else {
73 writeOutput = false;
74 storeMetaFile = false;
75 }
76
77 LOGP(info, "Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
80 if (writeOutput) {
81 mAggregator->setOutputDir(outputDir);
82 }
83 if (storeMetaFile) {
84 mAggregator->setMetaFileOutputDir(metaFileDir);
85 }
86
87 int autosave = ic.options().get<int>("autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
89 // TODO mAggregator should get an option to set the binning externally (expose TrackResiduals::setBinning methods to user? as command line option?)
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
92 mAggregator->setSlotLength(o2::calibration::INFINITE_TF);
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
94 } else {
95 mAggregator->setSlotLengthInSeconds(slotLength);
96 }
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<int>("compression"));
101 }
102
103 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
104 {
106 return;
107 }
108 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
109 return;
110 }
111 }
112
114 {
115 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
116 if (tinfo.globalRunNumberChanged) {
117 // new run is starting
118 LOG(info) << "New run start detected";
119 mRunStopRequested = false;
120 mInitDone = false;
121 }
122 if (mRunStopRequested) {
123 return;
124 }
125 auto runStartTime = std::chrono::high_resolution_clock::now();
127 recoCont.collectData(pc, *mDataRequest);
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
130
131 // we always require the unbinned residuals and the associated track references
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>("unbinnedRes");
133 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>("trackRefs");
134
135 // track data input is optional
136 const gsl::span<const o2::tpc::TrackData>* trkDataPtr = nullptr;
137 using trkDataType = std::decay_t<decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(""))>;
138 std::optional<trkDataType> trkData;
139 if (mTrackInput) {
140 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>("trkData"));
141 trkDataPtr = &trkData.value();
142 }
143 // CTP lumi input (optional)
144 const o2::ctp::LumiInfo* lumi = nullptr;
145 using lumiDataType = std::decay_t<decltype(pc.inputs().get<o2::ctp::LumiInfo>(""))>;
146 std::optional<lumiDataType> lumiInput;
147 if (mCTPInput) {
148 lumiInput = recoCont.getCTPLumi();
149 lumi = &lumiInput.value();
150 }
151
152 o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mAggregator->getCurrentTFInfo());
153 LOG(detail) << "Processing TF " << mAggregator->getCurrentTFInfo().tfCounter << " with " << trkData->size() << " tracks and " << residualsData.size() << " unbinned residuals associated to them";
154 mAggregator->process(residualsData, trackRefs, trkDataPtr, lumi);
155 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
156 LOGP(debug, "Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
157 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).count(),
158 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).count());
159 if (pc.transitionState() == TransitionHandlingState::Requested) {
160 LOG(info) << "Run stop requested, finalizing";
161 mRunStopRequested = true;
162 mAggregator->checkSlotsToFinalize();
163 mAggregator.reset();
164 LOG(info) << "Finalizing done";
165 }
166 }
167
169 {
170 if (mRunStopRequested) {
171 return;
172 }
173 LOG(info) << "Finalizing calibration for end of stream";
174 mAggregator->checkSlotsToFinalize();
175 mAggregator.reset(); // must invoke destructor manually here, otherwise we get a segfault
176 LOG(info) << "Finalizing done for end of stream";
177 }
178
179 private:
180 void updateTimeDependentParams(ProcessingContext& pc)
181 {
183 mTPCVDriftHelper.extractCCDBInputs(pc);
184 if (mTPCVDriftHelper.isUpdated()) {
185 LOGP(info, "Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
186 mTPCVDriftHelper.getVDriftObject().corrFact, mTPCVDriftHelper.getVDriftObject().refVDrift,
187 mTPCVDriftHelper.getVDriftObject().timeOffsetCorr, mTPCVDriftHelper.getVDriftObject().refTimeOffset,
188 mTPCVDriftHelper.getSourceName());
189 mAggregator->setTPCVDrift(mTPCVDriftHelper.getVDriftObject());
190 mTPCVDriftHelper.acknowledgeUpdate();
191 }
192 if (!mInitDone) {
193 mAggregator->setDataTakingContext(pc.services().get<DataTakingContext>());
194 mAggregator->setOrbitResetTime(o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS());
195 mInitDone = true;
196 }
197 }
198 o2::tpc::VDriftHelper mTPCVDriftHelper{};
199 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
200 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
201 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
202 bool mTrackInput{false};
203 bool mCTPInput{false};
204 bool mWriteBinnedResiduals{false};
205 bool mWriteUnbinnedResiduals{false};
206 bool mWriteTrackData{false};
207 bool mRunStopRequested{false};
208 bool mInitDone{false};
209};
210
211} // namespace calibration
212
213namespace framework
214{
215
216DataProcessorSpec getTPCResidualAggregatorSpec(bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData)
217{
218 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
219 if (ctpInput) {
220 dataRequest->requestClusters(GID::getSourcesMask("CTP"), false);
221 }
222 auto& inputs = dataRequest->inputs;
224 inputs.emplace_back("unbinnedRes", "GLO", "UNBINNEDRES");
225 inputs.emplace_back("trackRefs", "GLO", "TRKREFS");
226 if (trackInput) {
227 inputs.emplace_back("trkData", "GLO", "TRKDATA");
228 }
229 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
230 true, // GRPECS=true
231 false, // GRPLHCIF
232 false, // GRPMagField
233 false, // askMatLUT
235 inputs,
236 true);
237 return DataProcessorSpec{
238 "residual-aggregator",
239 inputs,
240 Outputs{},
241 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
242 Options{
243 {"sec-per-slot", VariantType::UInt32, 600u, {"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
244 {"updateInterval", VariantType::UInt32, 6'000u, {"update interval in number of TFs (only used in case slot length is infinite)"}},
245 {"max-delay", VariantType::UInt32, 1u, {"number of slots in past to consider"}},
246 {"min-entries", VariantType::Int, 0, {"minimum number of entries on average per voxel"}},
247 {"compression", VariantType::Int, 505, {"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
248 {"output-dir", VariantType::String, "none", {"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
249 {"meta-output-dir", VariantType::String, "/dev/null", {"Residuals metadata output directory, must exist (if not /dev/null)"}},
250 {"autosave-interval", VariantType::Int, 0, {"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
251}
252
253} // namespace framework
254} // namespace o2
255
256#endif // O2_TPC_RESIDUALAGGREGATORSPEC_H
Utils and constants for calibration and related workflows.
Helper for geometry and GRP related CCDB requests.
Collects local TPC cluster residuals from EPNs.
Definition of the TrackResiduals class.
Helper class to extract VDrift from different sources.
std::ostringstream debug
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
ResidualAggregatorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr< o2::globaltracking::DataRequest > dataRequest)
static mask_t getSourcesMask(const std::string_view srcList)
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
static std::string_view getSourceName(Source s)
bool isUpdated() const
GLint GLsizei count
Definition glcorearb.h:399
constexpr TFType INFINITE_TF
Definition TimeSlot.h:30
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
const o2::ctp::LumiInfo & getCTPLumi() const
float refTimeOffset
additive time offset reference (\mus)
float refVDrift
reference vdrift for which factor was extracted
float timeOffsetCorr
additive time offset correction (\mus)
float corrFact
drift velocity correction factor (multiplicative)
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
LumiInfo lumi