1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See for details of the copyright holders.
3// All rights not expressly granted are reserved.
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
24#include "Framework/Task.h"
30#include <fairmq/Device.h>
31#include <chrono>
33using namespace o2::framework;
36namespace o2
38namespace calibration
43 public:
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
47 {
49 int minEnt = ic.options().get<int>("min-entries");
50 auto slotLength = ic.options().get<uint32_t>("sec-per-slot");
51 bool useInfiniteSlotLength = false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength = true;
54 }
55 auto updateInterval = ic.options().get<uint32_t>("updateInterval");
56 auto delay = ic.options().get<uint32_t>("max-delay");
58 // should we write meta files for epn2eos?
59 bool storeMetaFile = false;
60 std::string metaFileDir = ic.options().get<std::string>("meta-output-dir");
61 if (metaFileDir != "/dev/null") {
62 metaFileDir = o2::utils::Str::rectifyDirectory(metaFileDir);
63 storeMetaFile = true;
64 }
66 // where should the ROOT output be written? in case its set to /dev/null
67 // we don't write anything, also no meta files of course
68 bool writeOutput = true;
69 std::string outputDir = ic.options().get<std::string>("output-dir");
70 if (outputDir != "/dev/null") {
71 outputDir = o2::utils::Str::rectifyDirectory(outputDir);
72 } else {
73 writeOutput = false;
74 storeMetaFile = false;
75 }
77 LOGP(info, "Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
80 if (writeOutput) {
81 mAggregator->setOutputDir(outputDir);
82 }
83 if (storeMetaFile) {
84 mAggregator->setMetaFileOutputDir(metaFileDir);
85 }
87 int autosave = ic.options().get<int>("autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
89 // TODO mAggregator should get an option to set the binning externally (expose TrackResiduals::setBinning methods to user? as command line option?)
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
92 mAggregator->setSlotLength(o2::calibration::INFINITE_TF);
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
94 } else {
95 mAggregator->setSlotLengthInSeconds(slotLength);
96 }
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<int>("compression"));
101 }
103 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
104 {
106 return;
107 }
108 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
109 return;
110 }
111 }
114 {
115 const auto& tinfo =<o2::framework::TimingInfo>();
116 if (tinfo.globalRunNumberChanged) {
117 // new run is starting
118 LOG(info) << "New run start detected";
119 mRunStopRequested = false;
120 mInitDone = false;
121 }
122 if (mRunStopRequested) {
123 return;
124 }
125 auto runStartTime = std::chrono::high_resolution_clock::now();
127 recoCont.collectData(pc, *mDataRequest);
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
131 // we always require the unbinned residuals and the associated track references
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>("unbinnedRes");
133 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>("trackRefs");
135 // track data input is optional
136 const gsl::span<const o2::tpc::TrackData>* trkDataPtr = nullptr;
137 using trkDataType = std::decay_t<decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(""))>;
138 std::optional<trkDataType> trkData;
139 if (mTrackInput) {
140 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>("trkData"));
141 trkDataPtr = &trkData.value();
142 }
143 // CTP lumi input (optional)
144 const o2::ctp::LumiInfo* lumi = nullptr;
145 using lumiDataType = std::decay_t<decltype(pc.inputs().get<o2::ctp::LumiInfo>(""))>;
146 std::optional<lumiDataType> lumiInput;
147 if (mCTPInput) {
148 recoCont.getCTPLumi();
149 lumiInput = recoCont.getCTPLumi();
150 lumi = &lumiInput.value();
151 }
153 o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mAggregator->getCurrentTFInfo());
154 LOG(detail) << "Processing TF " << mAggregator->getCurrentTFInfo().tfCounter << " with " << trkData->size() << " tracks and " << residualsData.size() << " unbinned residuals associated to them";
155 mAggregator->process(residualsData, trackRefs, trkDataPtr, lumi);
156 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
157 LOGP(debug, "Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
158 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).count(),
159 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).count());
160 if (pc.transitionState() == TransitionHandlingState::Requested) {
161 LOG(info) << "Run stop requested, finalizing";
162 mRunStopRequested = true;
163 mAggregator->checkSlotsToFinalize();
164 mAggregator.reset();
165 LOG(info) << "Finalizing done";
166 }
167 }
170 {
171 if (mRunStopRequested) {
172 return;
173 }
174 LOG(info) << "Finalizing calibration for end of stream";
175 mAggregator->checkSlotsToFinalize();
176 mAggregator.reset(); // must invoke destructor manually here, otherwise we get a segfault
177 LOG(info) << "Finalizing done for end of stream";
178 }
180 private:
181 void updateTimeDependentParams(ProcessingContext& pc)
182 {
184 mTPCVDriftHelper.extractCCDBInputs(pc);
185 if (mTPCVDriftHelper.isUpdated()) {
186 LOGP(info, "Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
187 mTPCVDriftHelper.getVDriftObject().corrFact, mTPCVDriftHelper.getVDriftObject().refVDrift,
188 mTPCVDriftHelper.getVDriftObject().timeOffsetCorr, mTPCVDriftHelper.getVDriftObject().refTimeOffset,
189 mTPCVDriftHelper.getSourceName());
190 mAggregator->setTPCVDrift(mTPCVDriftHelper.getVDriftObject());
191 mTPCVDriftHelper.acknowledgeUpdate();
192 }
193 if (!mInitDone) {
194 mAggregator->setDataTakingContext(<DataTakingContext>());
195 mAggregator->setOrbitResetTime(o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS());
196 mInitDone = true;
197 }
198 }
199 o2::tpc::VDriftHelper mTPCVDriftHelper{};
200 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
201 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
202 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
203 bool mTrackInput{false};
204 bool mCTPInput{false};
205 bool mWriteBinnedResiduals{false};
206 bool mWriteUnbinnedResiduals{false};
207 bool mWriteTrackData{false};
208 bool mRunStopRequested{false};
209 bool mInitDone{false};
212} // namespace calibration
214namespace framework
217DataProcessorSpec getTPCResidualAggregatorSpec(bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData)
219 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
220 if (ctpInput) {
221 dataRequest->requestClusters(GID::getSourcesMask("CTP"), false);
222 }
223 auto& inputs = dataRequest->inputs;
225 inputs.emplace_back("unbinnedRes", "GLO", "UNBINNEDRES");
226 inputs.emplace_back("trackRefs", "GLO", "TRKREFS");
227 if (trackInput) {
228 inputs.emplace_back("trkData", "GLO", "TRKDATA");
229 }
230 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
231 true, // GRPECS=true
232 false, // GRPLHCIF
233 false, // GRPMagField
234 false, // askMatLUT
236 inputs,
237 true);
238 return DataProcessorSpec{
239 "residual-aggregator",
240 inputs,
241 Outputs{},
242 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
243 Options{
244 {"sec-per-slot", VariantType::UInt32, 600u, {"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
245 {"updateInterval", VariantType::UInt32, 6'000u, {"update interval in number of TFs (only used in case slot length is infinite)"}},
246 {"max-delay", VariantType::UInt32, 1u, {"number of slots in past to consider"}},
247 {"min-entries", VariantType::Int, 0, {"minimum number of entries on average per voxel"}},
248 {"compression", VariantType::Int, 505, {"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
249 {"output-dir", VariantType::String, "none", {"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
250 {"meta-output-dir", VariantType::String, "/dev/null", {"Residuals metadata output directory, must exist (if not /dev/null)"}},
251 {"autosave-interval", VariantType::Int, 0, {"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
254} // namespace framework
255} // namespace o2
