Project
Loading...
Searching...
No Matches
TPCMergeIntegrateClusterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
20#include "Framework/Task.h"
23#include "CCDB/CcdbApi.h"
29#include "TPCBase/CalDet.h"
30
31#include <numeric>
32
33using namespace o2::framework;
34
35namespace o2
36{
37namespace tpc
38{
39
41{
42 public:
44 TPCMergeIntegrateClusters(std::shared_ptr<o2::base::GRPGeomRequest> req) : mCCDBRequest(req){};
45
47 {
49 mCalibrator = std::make_unique<o2::calibration::IntegratedClusterCalibrator<ITPCC>>();
50 mEnableWritingPadStatusMap = ic.options().get<bool>("enableWritingPadStatusMap");
51 const auto slotLength = ic.options().get<uint32_t>("tf-per-slot");
52 const auto maxDelay = ic.options().get<uint32_t>("max-delay");
53 const auto debug = ic.options().get<bool>("debug");
54 mNthreads = ic.options().get<int>("nthreads");
56 mProcess3D = ic.options().get<bool>("process-3D-currents");
57 mCalibrator->setSlotLength(slotLength);
58 mCalibrator->setMaxSlotsDelay(maxDelay);
59 // in case of 3D output set special debug output
60 if (debug && mProcess3D) {
61 mDump3D = true;
62 mCalibrator->setDebug(false);
63 } else {
64 mCalibrator->setDebug(debug);
65 }
66
67 mCalibFileDir = ic.options().get<std::string>("output-dir");
68 if (mCalibFileDir != "/dev/null") {
69 mCalibFileDir = o2::utils::Str::rectifyDirectory(mCalibFileDir);
70 }
71 mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
72 if (mMetaFileDir != "/dev/null") {
73 mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
74 }
75 mDumpCalibData = ic.options().get<bool>("dump-calib-data");
76 }
77
78 void run(ProcessingContext& pc) final
79 {
81 o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mCalibrator->getCurrentTFInfo());
82
83 // set data taking context only once
84 if (mSetDataTakingCont) {
85 mDataTakingContext = pc.services().get<DataTakingContext>();
86 mSetDataTakingCont = false;
87 }
88
89 // check slots to finalize at the beginning to avoid creation of a second slot and thus allocating new memory
90 mCalibrator->checkSlotsToFinalize(mCalibrator->getCurrentTFInfo().tfCounter, mCalibrator->getMaxSlotsDelay() * mCalibrator->getSlotLength());
91
92 LOGP(debug, "Created {} objects for {} slots with current TF {} and time stamp {}", mCalibrator->getTFinterval().size(), mCalibrator->getNSlots(), mCalibrator->getCurrentTFInfo().tfCounter, mCalibrator->getCurrentTFInfo().creation);
93 if (mCalibrator->hasCalibrationData()) {
94 sendOutput(pc.outputs());
95 }
96
97 const auto currents = pc.inputs().get<ITPCC*>(pc.inputs().get("itpcc"));
98
99 // accumulate the currents
100 mCalibrator->process(mCalibrator->getCurrentTFInfo().tfCounter, *currents);
101 }
102
104 {
105 LOGP(info, "Finalizing calibration. Dumping all objects");
106 // just write everything out
107 for (int i = 0; i < mCalibrator->getNSlots(); ++i) {
108 mCalibrator->finalizeSlot(mCalibrator->getSlot(i));
109 }
110 sendOutput(eos.outputs());
111 }
112
114
118
119 private:
120 std::unique_ptr<o2::calibration::IntegratedClusterCalibrator<ITPCC>> mCalibrator;
121 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
122 std::string mMetaFileDir{};
123 std::string mCalibFileDir{};
124 o2::framework::DataTakingContext mDataTakingContext{};
125 bool mSetDataTakingCont{true};
126 bool mDumpCalibData{false};
127 bool mProcess3D{false};
128 bool mDump3D{false};
129 int mNthreads{1};
130 std::unique_ptr<CalDet<PadFlags>> mPadFlagsMap;
131 bool mEnableWritingPadStatusMap{false};
132
133 void sendOutput(DataAllocator& output)
134 {
135 auto calibrations = std::move(*mCalibrator).getCalibs();
136 const auto& intervals = mCalibrator->getTimeIntervals();
137 assert(calibrations.size() == intervals.size());
138 for (unsigned int i = 0; i < calibrations.size(); i++) {
139 auto& object = calibrations[i];
140 o2::ccdb::CcdbObjectInfo info(CDBTypeMap.at(CDBType::CalITPC1), std::string{}, std::string{}, std::map<std::string, std::string>{}, intervals[i].first, intervals[i].second);
141
142 // perform factorization in case of 3D currents used as input
143 std::unique_ptr<std::vector<char>> imageFlagMap;
144 std::unique_ptr<std::vector<char>> imageITPC0;
145 if (mProcess3D) {
146 std::vector<uint32_t> crus(o2::tpc::CRU::MaxCRU);
147 std::iota(crus.begin(), crus.end(), 0);
148 const unsigned int nIntegrationIntervals = object.getEntries() / Mapper::getNumberOfPadsPerSide();
149 IDCFactorization factorizeqMax(nIntegrationIntervals, 1, crus);
150 IDCFactorization factorizeqTot(nIntegrationIntervals, 1, crus);
151 IDCFactorization factorizeNCl(nIntegrationIntervals, 1, crus);
152 LOGP(info, "Processing {} integration intervals", nIntegrationIntervals);
153
154 for (int cru = 0; cru < o2::tpc::CRU::MaxCRU; ++cru) {
155 CRU cruTmp(cru);
156 const Side side = cruTmp.side();
157 const unsigned int region = cruTmp.region();
158 const unsigned int sector = cruTmp.sector();
159 for (int interval = 0; interval < nIntegrationIntervals; ++interval) {
160 const unsigned int indexStart = interval * Mapper::getNumberOfPadsPerSide() + (sector % SECTORSPERSIDE) * Mapper::getPadsInSector() + Mapper::GLOBALPADOFFSET[region];
161 const unsigned int indexEnd = indexStart + Mapper::PADSPERREGION[region];
162 const auto& currqMax = (side == Side::A) ? object.mIQMaxA : object.mIQMaxC;
163 const auto& currqTot = (side == Side::A) ? object.mIQTotA : object.mIQTotC;
164 const auto& currNCl = (side == Side::A) ? object.mINClA : object.mINClC;
165
166 // check if values are empty -> dummy input
167 if (std::all_of(currNCl.begin() + indexStart, currNCl.begin() + indexEnd, [](float x) { return x == 0; })) {
168 continue;
169 }
170
171 // copy currents for factorization (ToDo: optimize the class for factorization such that no copy is required)
172 factorizeqMax.setIDCs(std::vector<float>(currqMax.begin() + indexStart, currqMax.begin() + indexEnd), cru, interval);
173 factorizeqTot.setIDCs(std::vector<float>(currqTot.begin() + indexStart, currqTot.begin() + indexEnd), cru, interval);
174 factorizeNCl.setIDCs(std::vector<float>(currNCl.begin() + indexStart, currNCl.begin() + indexEnd), cru, interval);
175 }
176
177 // once side changes deallocate the memory for A side
178 if (cru == o2::tpc::CRU::MaxCRU / 2) {
179 object.mIQMaxA = std::vector<float>();
180 object.mIQTotA = std::vector<float>();
181 object.mINClA = std::vector<float>();
182 }
183 }
184
185 // deallocate the memory for C side
186 object.mIQMaxC = std::vector<float>();
187 object.mIQTotC = std::vector<float>();
188 object.mINClC = std::vector<float>();
189
190 if (mDump3D) {
191 LOGP(info, "Writing debug output to file");
192 factorizeqMax.setTimeStamp(info.getStartValidityTimestamp());
193 factorizeqTot.setTimeStamp(info.getStartValidityTimestamp());
194 factorizeNCl.setTimeStamp(info.getStartValidityTimestamp());
195 const int run = std::stoi(mDataTakingContext.runNumber);
196 factorizeqMax.setRun(run);
197 factorizeqTot.setRun(run);
198 factorizeNCl.setRun(run);
199 factorizeqMax.dumpLargeObjectToFile(fmt::format("IDCFactorization_qMax_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp()).data());
200 factorizeqTot.dumpLargeObjectToFile(fmt::format("IDCFactorization_qTot_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp()).data());
201 factorizeNCl.dumpLargeObjectToFile(fmt::format("IDCFactorization_NCl_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp()).data());
202 }
203
204 // perform factorization (I0,I1,outlier map)
205 factorizeqMax.factorizeIDCs(true, false); // normalize to pad size
206 factorizeqTot.factorizeIDCs(true, false); // normalize to pad size
207 factorizeNCl.factorizeIDCs(false, false); // do not normalize to pad size
208
209 // copy calibration data
210 object.mIQMaxA = factorizeqMax.getIDCOneVec(Side::A);
211 object.mIQMaxC = factorizeqMax.getIDCOneVec(Side::C);
212 object.mIQTotA = factorizeqTot.getIDCOneVec(Side::A);
213 object.mIQTotC = factorizeqTot.getIDCOneVec(Side::C);
214 object.mINClA = factorizeNCl.getIDCOneVec(Side::A);
215 object.mINClC = factorizeNCl.getIDCOneVec(Side::C);
216
217 // storing pad status map in CCDB
218 auto padStatusMap = factorizeNCl.getPadStatusMap();
219 if (mEnableWritingPadStatusMap) {
220 // always store the first map
221 if (!mPadFlagsMap) {
222 mPadFlagsMap = std::move(padStatusMap);
223 o2::ccdb::CcdbObjectInfo ccdbInfoPadFlags(CDBTypeMap.at(CDBType::CalIDCPadStatusMapA), std::string{}, std::string{}, std::map<std::string, std::string>{}, intervals[i].first, intervals[i].second);
224 imageFlagMap = o2::ccdb::CcdbApi::createObjectImage(mPadFlagsMap.get(), &ccdbInfoPadFlags);
225 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoPadFlags.getPath(), ccdbInfoPadFlags.getFileName(), imageFlagMap->size(), ccdbInfoPadFlags.getStartValidityTimestamp(), ccdbInfoPadFlags.getEndValidityTimestamp());
228 } else {
229 // check if map changed. if it changed update the map in the CCDB and store new map in buffer
230 if (!(*padStatusMap.get() == *mPadFlagsMap.get())) {
231 LOGP(info, "Pad status map changed");
232 o2::ccdb::CcdbObjectInfo ccdbInfoPadFlags(CDBTypeMap.at(CDBType::CalIDCPadStatusMapA), std::string{}, std::string{}, std::map<std::string, std::string>{}, intervals[i].first, intervals[i].second);
233 imageFlagMap = o2::ccdb::CcdbApi::createObjectImage(mPadFlagsMap.get(), &ccdbInfoPadFlags);
234 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoPadFlags.getPath(), ccdbInfoPadFlags.getFileName(), imageFlagMap->size(), ccdbInfoPadFlags.getStartValidityTimestamp(), ccdbInfoPadFlags.getEndValidityTimestamp());
237 mPadFlagsMap = std::move(padStatusMap);
238 }
239 }
240 }
241
242 // moving I0 to calibration object
243 ITPCZero itpczero;
244 itpczero.mIQMaxA = std::move(factorizeqMax).getIDCZero(Side::A);
245 itpczero.mIQMaxC = std::move(factorizeqMax).getIDCZero(Side::C);
246 itpczero.mIQTotA = std::move(factorizeqTot).getIDCZero(Side::A);
247 itpczero.mIQTotC = std::move(factorizeqTot).getIDCZero(Side::C);
248 itpczero.mINClA = std::move(factorizeNCl).getIDCZero(Side::A);
249 itpczero.mINClC = std::move(factorizeNCl).getIDCZero(Side::C);
250
251 o2::ccdb::CcdbObjectInfo ccdbInfoITPC0(CDBTypeMap.at(CDBType::CalITPC0), std::string{}, std::string{}, std::map<std::string, std::string>{}, intervals[i].first, intervals[i].second);
252 imageITPC0 = o2::ccdb::CcdbApi::createObjectImage(&itpczero, &ccdbInfoITPC0);
253 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoITPC0.getPath(), ccdbInfoITPC0.getFileName(), imageITPC0->size(), ccdbInfoITPC0.getStartValidityTimestamp(), ccdbInfoITPC0.getEndValidityTimestamp());
256 }
257
258 auto image = o2::ccdb::CcdbApi::createObjectImage(&object, &info);
259 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", info.getPath(), info.getFileName(), image->size(), info.getStartValidityTimestamp(), info.getEndValidityTimestamp());
262
263 if (mDumpCalibData && mCalibFileDir != "/dev/null") {
264 std::string calibFName = fmt::format("itpc_cal_data_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp());
265 try {
266 std::ofstream calFile(fmt::format("{}{}", mCalibFileDir, calibFName), std::ios::out | std::ios::binary);
267 calFile.write(image->data(), image->size());
268 calFile.close();
269 if (imageFlagMap) {
270 std::string calibFNameMap = fmt::format("itpc_cal_map_data_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp());
271 std::ofstream calFileMap(fmt::format("{}{}", mCalibFileDir, calibFNameMap), std::ios::out | std::ios::binary);
272 calFileMap.write(imageFlagMap->data(), imageFlagMap->size());
273 calFileMap.close();
274 }
275 if (imageITPC0) {
276 std::string calibFNameI0 = fmt::format("itpc_cal0_data_{}_{}.root", info.getStartValidityTimestamp(), info.getEndValidityTimestamp());
277 std::ofstream calFileI0(fmt::format("{}{}", mCalibFileDir, calibFNameI0), std::ios::out | std::ios::binary);
278 calFileI0.write(imageITPC0->data(), imageITPC0->size());
279 calFileI0.close();
280 }
281 } catch (std::exception const& e) {
282 LOG(error) << "Failed to store ITPCC calibration data file " << calibFName << ", reason: " << e.what();
283 }
284
285 if (mMetaFileDir != "/dev/null") {
287 calMetaData.fillFileData(calibFName);
288 calMetaData.setDataTakingContext(mDataTakingContext);
289 calMetaData.type = "calib";
290 calMetaData.priority = "low";
291 auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
292 auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
293 try {
294 std::ofstream metaFileOut(metaFileNameTmp);
295 metaFileOut << calMetaData;
296 metaFileOut.close();
297 std::filesystem::rename(metaFileNameTmp, metaFileName);
298 } catch (std::exception const& e) {
299 LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
300 }
301 }
302 }
303 }
304 mCalibrator->initOutput(); // empty the outputs after they are send
305 }
306};
307
309{
310 std::vector<InputSpec> inputs;
311 inputs.emplace_back("itpcc", o2::header::gDataOriginTPC, getDataDescriptionTPCC(), 0, Lifetime::Sporadic);
312 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
313 true, // GRPECS=true for nHBF per TF
314 false, // GRPLHCIF
315 false, // GRPMagField
316 false, // askMatLUT
318 inputs);
319
320 std::vector<OutputSpec> outputs;
327
328 return DataProcessorSpec{
329 "tpc-merge-integrated-clusters",
330 inputs,
331 outputs,
332 AlgorithmSpec{adaptFromTask<TPCMergeIntegrateClusters>(ccdbRequest)},
333 Options{
334 {"debug", VariantType::Bool, false, {"Write debug output files"}},
335 {"tf-per-slot", VariantType::UInt32, 1000u, {"number of TFs per calibration time slot"}},
336 {"max-delay", VariantType::UInt32, 3u, {"number of slots in past to consider"}},
337 {"output-dir", VariantType::String, "none", {"calibration files output directory, must exist"}},
338 {"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
339 {"dump-calib-data", VariantType::Bool, false, {"Dump ITPCC calibration data to file"}},
340 {"process-3D-currents", VariantType::Bool, false, {"Process full 3D currents instead of 1D integrated only currents"}},
341 {"enableWritingPadStatusMap", VariantType::Bool, false, {"Write the pad status map to CCDB"}},
342 {"nthreads", VariantType::Int, 1, {"Number of threads used for factorization"}}}};
343}
344
345} // end namespace tpc
346} // end namespace o2
Simple interface to the CDB manager.
Utils and constants for calibration and related workflows.
int32_t i
Helper for geometry and GRP related CCDB requests.
class for aggregating IDCs for the full TPC (all sectors) and factorization of aggregated IDCs
calibrator class for accumulating integrated clusters
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t side
Definition RawData.h:0
std::ostringstream debug
void checkUpdates(o2::framework::ProcessingContext &pc)
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
@ MaxCRU
Definition CRU.h:31
static void setNThreads(const int nThreads)
static constexpr unsigned int GLOBALPADOFFSET[NREGIONS]
offset of number of pads for region
Definition Mapper.h:531
static constexpr unsigned short getPadsInSector()
Definition Mapper.h:414
static constexpr int getNumberOfPadsPerSide()
Definition Mapper.h:374
static constexpr unsigned int PADSPERREGION[NREGIONS]
number of pads per CRU
Definition Mapper.h:530
static constexpr header::DataDescription getDataDescriptionCCDB()
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
static constexpr header::DataDescription getDataDescriptionCCDBITPC0()
TPCMergeIntegrateClusters(std::shared_ptr< o2::base::GRPGeomRequest > req)
\construcor
static constexpr header::DataDescription getDataDescriptionCCDBITPCPadFlag()
void endOfStream(EndOfStreamContext &eos) final
void init(framework::InitContext &ic) final
GLeglImageOES image
Definition glcorearb.h:4021
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint object
Definition glcorearb.h:4041
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
Interval< T > interval(const VerticalEdge< T > &edge)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:94
constexpr unsigned char SECTORSPERSIDE
Definition Defs.h:40
o2::framework::DataProcessorSpec getTPCMergeIntegrateClusterSpec()
Side
TPC readout sidE.
Definition Defs.h:35
@ A
Definition Defs.h:35
@ C
Definition Defs.h:36
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalITPC0
2D average TPC clusters for longer time interval
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
std::string runNumber
The current run number.
struct containing the integrated TPC currents
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"