Project
Loading...
Searching...
No Matches
TPCDistributeCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCDISTRIBUTECMVSPEC_H
17#define O2_TPCDISTRIBUTECMVSPEC_H
18
19#include <vector>
20#include <chrono>
21#include <fmt/format.h>
22#include "TParameter.h"
23#include "Framework/Task.h"
25#include "Framework/Logger.h"
29#include "Headers/DataHeader.h"
36#include "TMemFile.h"
37#include "CCDB/CcdbApi.h"
38#include "CCDB/CcdbObjectInfo.h"
41#include "DataFormatsTPC/CMV.h"
42
43using namespace o2::framework;
45using namespace o2::tpc;
46
47namespace o2::tpc
48{
49
51{
52 public:
53 TPCDistributeCMVSpec(const std::vector<uint32_t>& crus, const unsigned int timeframes, const int nTFsBuffer, const int firstTF, const bool sendCCDB, const bool usePreciseTimestamp, std::shared_ptr<o2::base::GRPGeomRequest> req)
54 : mCRUs{crus},
55 mTimeFrames{timeframes},
56 mNTFsBuffer{nTFsBuffer},
57 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
58 mTFStart{{firstTF, firstTF + timeframes}},
59 mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}},
60 mCCDBRequest(req),
61 mSendCCDB{sendCCDB},
62 mUsePreciseTimestamp{usePreciseTimestamp},
63 mSendCCDBOutputOrbitReset(1),
64 mSendCCDBOutputGRPECS(1),
65 mOrbitInfoForwarded{{std::vector<bool>(timeframes, false), std::vector<bool>(timeframes, false)}}
66 {
67 // sort vector for binary_search
68 std::sort(mCRUs.begin(), mCRUs.end());
69
70 for (auto& processedCRUbuffer : mProcessedCRUs) {
71 processedCRUbuffer.resize(mTimeFrames);
72 for (auto& crusMap : processedCRUbuffer) {
73 crusMap.reserve(mCRUs.size());
74 for (const auto cruID : mCRUs) {
75 crusMap.emplace(cruID, false);
76 }
77 }
78 }
79
80 mFilter.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic});
81 mOrbitFilter.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic});
82
83 // pre-allocate the accumulator TTree for the current aggregation interval
84 initIntervalTree();
85 };
86
88 {
90 mNFactorTFs = ic.options().get<int>("nFactorTFs");
91 mNTFsDataDrop = ic.options().get<int>("drop-data-after-nTFs");
92 mCheckEveryNData = ic.options().get<int>("check-data-every-n");
93 if (mCheckEveryNData == 0) {
94 mCheckEveryNData = mTimeFrames / 2;
95 if (mCheckEveryNData == 0) {
96 mCheckEveryNData = 1;
97 }
98 mNTFsDataDrop = mCheckEveryNData;
99 }
100 mDumpCMVs = ic.options().get<bool>("dump-cmvs");
101 mUseCompressionVarint = ic.options().get<bool>("use-compression-varint");
102 mUseSparse = ic.options().get<bool>("use-sparse");
103 mUseCompressionHuffman = ic.options().get<bool>("use-compression-huffman");
104 mRoundIntegersThreshold = static_cast<uint16_t>(ic.options().get<int>("cmv-round-integers-threshold"));
105 mZeroThreshold = ic.options().get<float>("cmv-zero-threshold");
106 mDynamicPrecisionMean = ic.options().get<float>("cmv-dynamic-precision-mean");
107 mDynamicPrecisionSigma = ic.options().get<float>("cmv-dynamic-precision-sigma");
108 LOGP(info, "CMV compression settings: use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}",
109 mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma);
110 // re-initialise the interval tree now that compression options are known (constructor used the defaults)
111 initIntervalTree();
112 }
113
114 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
115 {
117 if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) {
118 LOGP(info, "Updating ORBITRESET");
119 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true);
120 } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) {
121 // check if received object is valid
122 if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) {
123 LOGP(info, "Updating GRPECS");
124 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true);
125 } else {
126 LOGP(info, "Detected default GRPECS object");
127 }
128 }
129 }
130
132 {
133 // capture orbit-reset info once for precise CCDB timestamp calculation
134 if (mCCDBRequest->askTime) {
135 const bool grpecsValid = pc.inputs().isValid("grpecs");
136 const bool orbitResetValid = pc.inputs().isValid("orbitReset");
137 if (grpecsValid) {
138 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
139 }
140 if (orbitResetValid) {
141 pc.inputs().get<std::vector<Long64_t>*>("orbitReset");
142 }
143 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
144 return;
145 }
146 // update mTFInfo from GRPGeomHelper whenever orbit-reset or GRPECS objects are fresh
147 if (mSendCCDBOutputOrbitReset[0] && mSendCCDBOutputGRPECS[0]) {
148 mSendCCDBOutputOrbitReset[0] = false;
149 mSendCCDBOutputGRPECS[0] = false;
151 }
152 }
153
154 const auto tf = processing_helpers::getCurrentTF(pc);
155 mLastSeenTF = tf; // track for endOfStream flush
156
157 // automatically detect firstTF in case firstTF was not specified
158 if (mTFStart.front() <= -1) {
159 const auto firstTF = tf;
160 const long offsetTF = std::abs(mTFStart.front() + 1);
161 const auto nTotTFs = getNRealTFs();
162 mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs};
163 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
164 LOGP(info, "Setting {} as first TF", mTFStart[0]);
165 LOGP(info, "Using offset of {} TFs for setting the first TF", offsetTF);
166 }
167
168 // check which buffer to use for current incoming data
169 const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
170 if (mTFStart[currentBuffer] > tf) {
171 LOGP(info, "All CRUs for current TF {} already received. Skipping this TF", tf);
172 return;
173 }
174
175 const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer;
176 LOGP(info, "Current TF: {}, relative TF: {}, current buffer: {}, mTFStart: {}", tf, relTF, currentBuffer, mTFStart[currentBuffer]);
177
178 if (relTF >= mProcessedCRU[currentBuffer].size()) {
179 LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size());
180
181 // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received
182 mProcessedTotalData = mCheckEveryNData;
183 checkIntervalsForMissingData(pc, currentBuffer, relTF, tf);
184 return;
185 }
186
187 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
188 return;
189 }
190
191 // record the absolute first TF of this aggregation interval
192 if (mIntervalTFCount == 0) {
193 mIntervalFirstTF = tf;
194 }
195
196 // set CCDB start timestamp once at the start of each aggregation interval
197 if (mTimestampStart == 0) {
198 setTimestampCCDB(relTF, pc);
199 }
200
201 // capture orbit/BC info into the interval once per relTF.
202 // all CRUs within a TF carry identical timing, so the first one is sufficient.
203 if (!mOrbitInfoForwarded[currentBuffer][relTF]) {
204 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
205 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
206 const unsigned int cru = hdr->subSpecification >> 7;
207 if (std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
208 const auto orbitBC = pc.inputs().get<uint64_t>(ref);
209 if (mCurrentTF.firstOrbit == 0 && mCurrentTF.firstBC == 0) {
210 mCurrentTF.firstOrbit = static_cast<uint32_t>(orbitBC >> 32);
211 mCurrentTF.firstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
212 }
213 mOrbitInfoForwarded[currentBuffer][relTF] = true;
214 break; // one per relTF is enough
215 }
216 }
217 }
218
219 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
220 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
221 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
222
223 // check if cru is specified in input cru list
224 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
225 LOGP(info, "Received data from CRU: {} which was not specified as input. Skipping", cru);
226 continue;
227 }
228
229 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
230 continue;
231 } else {
232 // count total number of processed CRUs for given TF
233 ++mProcessedCRU[currentBuffer][relTF];
234
235 // to keep track of processed CRUs
236 mProcessedCRUs[currentBuffer][relTF][cru] = true;
237 }
238
239 // accumulate raw 16-bit CMVs into the flat array for the current TF
240 auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
241 const uint32_t nTimeBins = std::min(static_cast<uint32_t>(cmvVec.size()), cmv::NTimeBinsPerTF);
242 for (uint32_t tb = 0; tb < nTimeBins; ++tb) {
243 mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = cmvVec[tb];
244 }
245 }
246
247 LOGP(info, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf);
248
249 // check for missing data if specified
250 if (mNTFsDataDrop > 0) {
251 checkIntervalsForMissingData(pc, currentBuffer, relTF, tf);
252 }
253
254 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
255 ++mProcessedTFs[currentBuffer];
256
257 // Pre-processing: quantisation / rounding / zeroing (applied before compression)
258 mCurrentTF.roundToIntegers(mRoundIntegersThreshold);
259 if (mZeroThreshold > 0.f) {
260 mCurrentTF.zeroSmallValues(mZeroThreshold);
261 }
262 if (mDynamicPrecisionSigma > 0.f) {
263 mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
264 }
265
266 // Compress; the raw CMVPerTF branch is used when all flags are zero
267 const uint8_t flags = buildCompressionFlags();
268 if (flags != CMVEncoding::kNone) {
269 mCurrentCompressedTF = mCurrentTF.compress(flags);
270 }
271
272 mIntervalTree->Fill();
273 ++mIntervalTFCount;
274 mCurrentTF = CMVPerTF{};
275 }
276
277 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
278 sendOutput(pc.outputs(), tf);
279 finishInterval(pc, currentBuffer, tf);
280 }
281 }
282
284 {
285 LOGP(info, "End of stream, flushing CMV interval ({} TFs)", mIntervalTFCount);
286 // correct mTFEnd for the partial last interval so the CCDB validity end timestamp reflects the actual last TF, not the expected interval end
287 mTFEnd[mBuffer] = mLastSeenTF;
288 sendOutput(ec.outputs(), mLastSeenTF);
289 ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
290 }
291
293
295 static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
296 {
297 const std::string name = fmt::format("CMVAGG{}", lane).data();
299 description.runtimeInit(name.substr(0, 16).c_str());
300 return description;
301 }
302
305 {
306 const std::string name = fmt::format("CMVORB{}", lane);
308 description.runtimeInit(name.substr(0, 16).c_str());
309 return description;
310 }
311
314
315 private:
316 std::vector<uint32_t> mCRUs{};
317 const unsigned int mTimeFrames{};
318 const int mNTFsBuffer{1};
319 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
320 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
321 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
322 std::array<long, 2> mTFStart{};
323 std::array<long, 2> mTFEnd{};
324 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
325 std::vector<bool> mSendCCDBOutputOrbitReset{};
326 std::vector<bool> mSendCCDBOutputGRPECS{};
327 bool mBuffer{false};
328 bool mSendCCDB{false};
329 bool mUsePreciseTimestamp{false};
330 bool mDumpCMVs{false};
331 bool mUseCompressionVarint{false};
332 bool mUseSparse{false};
333 bool mUseCompressionHuffman{false};
334 uint16_t mRoundIntegersThreshold{0};
335 float mZeroThreshold{0.f};
336 float mDynamicPrecisionMean{1.f};
337 float mDynamicPrecisionSigma{0.f};
338 long mTimestampStart{0};
340 std::unique_ptr<TTree> mIntervalTree{};
341 CMVPerTF mCurrentTF{};
342 CMVPerTFCompressed mCurrentCompressedTF{};
343 long mIntervalFirstTF{0};
344 unsigned int mIntervalTFCount{0};
345 int mNFactorTFs{0};
346 int mNTFsDataDrop{0};
347 std::array<int, 2> mStartNTFsDataDrop{0};
348 long mProcessedTotalData{0};
349 int mCheckEveryNData{1};
350 std::vector<InputSpec> mFilter{};
351 std::vector<InputSpec> mOrbitFilter{};
352 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
353 uint32_t mLastSeenTF{0};
354
356 unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; }
357
359 uint8_t buildCompressionFlags() const
360 {
362 if (mUseSparse) {
364 }
365 if (mUseCompressionHuffman) {
367 } else if (mUseCompressionVarint) {
369 }
370 // Delta coding is only applied for the dense (non-sparse) path with a value compressor
373 }
374 return flags;
375 }
376
380 void initIntervalTree()
381 {
382 mIntervalTree = std::make_unique<TTree>("ccdb_object", "ccdb_object");
383 mIntervalTree->SetAutoSave(0);
384 mIntervalTree->SetDirectory(nullptr);
385 if (buildCompressionFlags() != CMVEncoding::kNone) {
386 mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF);
387 } else {
388 mIntervalTree->Branch("CMVPerTF", &mCurrentTF);
389 }
390 }
391
392 void clearBuffer(const bool currentBuffer)
393 {
394 // resetting received CRUs
395 for (auto& crusMap : mProcessedCRUs[currentBuffer]) {
396 for (auto& it : crusMap) {
397 it.second = false;
398 }
399 }
400
401 mProcessedTFs[currentBuffer] = 0; // reset processed TFs for next aggregation interval
402 std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0);
403 std::fill(mOrbitInfoForwarded[currentBuffer].begin(), mOrbitInfoForwarded[currentBuffer].end(), false);
404
405 // set integration range for next integration interval
406 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
407 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
408
409 // switch buffer
410 mBuffer = !mBuffer;
411 }
412
413 void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const uint32_t tf)
414 {
415 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
416 LOGP(info, "Checking for dropped packages...");
417
418 // if last buffer has smaller time range check the whole last buffer
419 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
420 LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
421 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
422 LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf);
423 sendOutput(pc.outputs(), tf);
424 finishInterval(pc, !currentBuffer, tf);
425 }
426
427 const int tfEndCheck = std::clamp(static_cast<int>(relTF) - mNTFsDataDrop, 0, static_cast<int>(mProcessedCRU[currentBuffer].size()));
428 LOGP(info, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
429 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck);
430 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
431 }
432 }
433
434 void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF)
435 {
436 for (int iTF = startTF; iTF < endTF; ++iTF) {
437 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
438 LOGP(warning, "CRUs for rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
439 ++mProcessedTFs[currentBuffer];
440 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
441
442 // find missing CRUs and leave their interval slots empty (zero-filled)
443 for (auto& it : mProcessedCRUs[currentBuffer][iTF]) {
444 if (!it.second) {
445 it.second = true;
446 }
447 }
448
449 // leave orbit/BC as zero placeholder for missing TFs
450 mOrbitInfoForwarded[currentBuffer][iTF] = true;
451 }
452 }
453 }
454
455 void finishInterval(o2::framework::ProcessingContext& pc, const bool buffer, const uint32_t tf)
456 {
457 if (mNFactorTFs > 0) {
458 mNFactorTFs = 0;
459 // ToDo: Find better fix
460 auto& deviceProxy = pc.services().get<FairMQDeviceProxy>();
461 if (deviceProxy.getNumOutputChannels() > 0) {
462 auto& state = deviceProxy.getOutputChannelState({0});
463 size_t oldest = std::numeric_limits<size_t>::max() - 1; // just set to really large value
464 state.oldestForChannel = {oldest};
465 }
466 }
467
468 LOGP(info, "All TFs {} for current buffer received. Clearing buffer", tf);
469 clearBuffer(buffer);
470 mStartNTFsDataDrop[buffer] = 0;
471
472 // reset per-interval state for the next aggregation interval
473 initIntervalTree();
474 mIntervalFirstTF = 0;
475 mIntervalTFCount = 0;
476 mCurrentTF = CMVPerTF{};
477 mCurrentCompressedTF = CMVPerTFCompressed{};
478 mTimestampStart = 0;
479 LOGP(info, "Everything cleared. Waiting for new data to arrive.");
480 }
481
482 void setTimestampCCDB(const long relTF, o2::framework::ProcessingContext& pc)
483 {
484 if (mUsePreciseTimestamp && !mTFInfo.second) {
485 return;
486 }
487 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
488 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * mTFInfo.second;
489 mTimestampStart = mUsePreciseTimestamp
490 ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001)
491 : tinfo.creation;
492 LOGP(info, "Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
493 mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, mTFInfo.second, relTF, nOrbitsOffset);
494 }
495
496 void sendOutput(DataAllocator& output, const uint32_t tf)
497 {
498 using timer = std::chrono::high_resolution_clock;
499
500 if (mIntervalTFCount == 0) {
501 LOGP(warning, "CMV interval is empty at sendOutput, skipping");
502 return;
503 }
504
505 // attach interval metadata to the TTree (stored once per tree)
506 mIntervalTree->GetUserInfo()->Clear();
507 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("firstTF", mIntervalFirstTF));
508 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("lastTF", mLastSeenTF));
509
510 LOGP(info, "CMVPerTF TTree: {} entries, firstTF={}, lastTF={}", mIntervalTFCount, mIntervalFirstTF, mLastSeenTF);
511 auto start = timer::now();
512
513 // write local ROOT file for debugging
514 if (mDumpCMVs) {
515 const std::string fname = fmt::format("CMV_timestamp{}.root", mTimestampStart);
516 try {
517 mCurrentTF.writeToFile(fname, mIntervalTree);
518 LOGP(info, "CMV debug file written to {}", fname);
519 } catch (const std::exception& e) {
520 LOGP(error, "Failed to write CMV debug file: {}", e.what());
521 }
522 }
523
524 if (!mSendCCDB) {
525 LOGP(warning, "CCDB output disabled, skipping upload!");
526 return;
527 }
528
529 const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
530 // use the actual number of TFs in this interval (mIntervalTFCount) rather than mTimeFrames, so the CCDB validity end is correct for partial last intervals
531 const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * mNTFsBuffer * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
532
533 if (timeStampEnd <= mTimestampStart) {
534 LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload!",
535 mTimestampStart, timeStampEnd);
536 return;
537 }
538
539 LOGP(info, "CCDB timestamp range start:{} end:{}", mTimestampStart, timeStampEnd);
540
541 o2::ccdb::CcdbObjectInfo ccdbInfoCMV(
542 "TPC/Calib/CMV",
543 "TTree",
544 "CMV.root",
545 {},
546 mTimestampStart,
547 timeStampEnd);
548
549 auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV);
550 // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end
551 {
552 TMemFile mf("trim", image->data(), static_cast<Long64_t>(image->size()), "READ");
553 image->resize(static_cast<size_t>(mf.GetEND()));
554 mf.Close();
555 }
556 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}",
557 ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(),
558 ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
559
560 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBCMV(), 0}, *image);
561 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBCMV(), 0}, ccdbInfoCMV);
562
563 auto stop = timer::now();
564 std::chrono::duration<float> elapsed = stop - start;
565 LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count());
566 }
567};
568
569DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const unsigned int timeframes, const int firstTF, const bool sendCCDB = false, const bool usePreciseTimestamp = false, const int nTFsBuffer = 1)
570{
571 std::vector<InputSpec> inputSpecs;
572 inputSpecs.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic});
573 inputSpecs.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic});
574
575 std::vector<OutputSpec> outputSpecs;
576 if (sendCCDB) {
577 outputSpecs.emplace_back(
580 Lifetime::Sporadic);
581 outputSpecs.emplace_back(
584 Lifetime::Sporadic);
585 }
586
587 const bool fetchCCDB = usePreciseTimestamp;
588 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB, // orbitResetTime
589 fetchCCDB, // GRPECS=true
590 false, // GRPLHCIF
591 false, // GRPMagField
592 false, // askMatLUT
594 inputSpecs);
595
596 const std::string type = "cmv";
597 const auto id = fmt::format("tpc-distribute-{}-{:02}", type, ilane);
599 id.data(),
600 inputSpecs,
601 outputSpecs,
602 AlgorithmSpec{adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, firstTF, sendCCDB, usePreciseTimestamp, ccdbRequest)},
603 Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data"}},
604 {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)"}},
605 {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF"}},
606 {"dump-cmvs", VariantType::Bool, false, {"Dump CMVs to a local ROOT file for debugging"}},
607 {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
608 {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
609 {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
610 {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
611 {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
612 {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
613 {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}}; // end DataProcessorSpec
614
615 spec.rank = ilane;
616 return spec;
617}
618
619} // namespace o2::tpc
620
621#endif
header::DataDescription description
benchmark::State & state
Structs for storing CMVs to the CCDB.
Common mode values data format definition.
Utils and constants for calibration and related workflows.
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
int nTimeBins
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
TPC device for processing CMVs on FLPs.
auto getOrbitResetTimeMS() const
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr header::DataDescription getDataDescriptionCMVFirstTF()
void init(o2::framework::InitContext &ic) final
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
Return data description for aggregated CMVs for a given lane.
static constexpr header::DataDescription getDataDescriptionCCDBCMV()
static constexpr header::DataDescription getDataDescriptionCMVOrbitReset()
TPCDistributeCMVSpec(const std::vector< uint32_t > &crus, const unsigned int timeframes, const int nTFsBuffer, const int firstTF, const bool sendCCDB, const bool usePreciseTimestamp, std::shared_ptr< o2::base::GRPGeomRequest > req)
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane)
Return data description for orbit/BC info for a given output lane.
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo()
Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
static constexpr header::DataDescription getDataDescriptionCMVGroup()
GLeglImageOES image
Definition glcorearb.h:4021
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLbitfield flags
Definition glcorearb.h:1570
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
uint8_t itsSharedClusterMap uint8_t
constexpr double LHCOrbitMUS
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const int firstTF, const bool sendCCDB=false, const bool usePreciseTimestamp=false, const int nTFsBuffer=1)
Enum< T >::Iterator begin(Enum< T >)
Definition Defs.h:156
std::unique_ptr< GPUReconstructionTimeframe > tf
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
static constexpr uint8_t kDelta
Delta coding between consecutive values (dense only)
static constexpr uint8_t kVarint
Varint compression of the value stream.
static constexpr uint8_t kHuffman
Canonical Huffman compression of the value stream.
static constexpr uint8_t kNone
No compression — raw uint16 values stored flat.
static constexpr uint8_t kSparse
Non-zero positions stored sparsely (varint-encoded deltas)
static constexpr uint8_t kZigzag
Zigzag encoding of deltas or signed values.