Project
Loading...
Searching...
No Matches
TPCAggregateCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCAGGREGATECMVSPEC_H
17#define O2_TPCAGGREGATECMVSPEC_H
18
19#include <algorithm>
20#include <chrono>
21#include <limits>
22#include <thread>
23#include <unordered_map>
24#include <vector>
25#include <fmt/format.h>
26#include <filesystem>
27#include <fstream>
28#include "TMemFile.h"
29#include "TParameter.h"
30#include "Framework/Task.h"
32#include "Framework/Logger.h"
37#include "Headers/DataHeader.h"
40#include "CCDB/CcdbApi.h"
41#include "CCDB/CcdbObjectInfo.h"
46#include "DataFormatsTPC/CMV.h"
51
52using namespace o2::framework;
54
55namespace o2::tpc
56{
57
59{
60 public:
61 TPCAggregateCMVDevice(const int lane,
62 const std::vector<uint32_t>& crus,
63 const unsigned int timeframes,
64 const bool sendCCDB,
65 const bool usePreciseTimestamp,
66 const int nTFsBuffer,
67 std::shared_ptr<o2::base::GRPGeomRequest> req)
68 : mLaneId{lane},
69 mCRUs{crus},
70 mTimeFrames{timeframes},
71 mSendCCDB{sendCCDB},
72 mUsePreciseTimestamp{usePreciseTimestamp},
73 mNTFsBuffer{nTFsBuffer},
74 mProcessedCRU(timeframes),
75 mProcessedCRUs(timeframes),
76 mRawCMVs(timeframes),
77 mOrbitInfo(timeframes),
78 mOrbitStep(timeframes),
79 mOrbitInfoSeen(timeframes, false),
80 mTFCompleted(timeframes, false),
81 mCCDBRequest(req)
82 {
83 std::sort(mCRUs.begin(), mCRUs.end());
84 for (auto& crusMap : mProcessedCRUs) {
85 crusMap.reserve(mCRUs.size());
86 for (const auto cruID : mCRUs) {
87 crusMap.emplace(cruID, false);
88 }
89 }
90 initIntervalTree();
91 }
92
94 {
96 mOutputDir = ic.options().get<std::string>("output-dir");
97 if (mOutputDir != "/dev/null") {
98 mOutputDir = o2::utils::Str::rectifyDirectory(mOutputDir);
99 }
100 mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
101 if (mMetaFileDir != "/dev/null") {
102 mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
103 }
104 mUseCompressionVarint = ic.options().get<bool>("use-compression-varint");
105 mUseSparse = ic.options().get<bool>("use-sparse");
106 mUseCompressionHuffman = ic.options().get<bool>("use-compression-huffman");
107 mRoundIntegersThreshold = static_cast<uint16_t>(ic.options().get<int>("cmv-round-integers-threshold"));
108 mZeroThreshold = ic.options().get<float>("cmv-zero-threshold");
109 mDynamicPrecisionMean = ic.options().get<float>("cmv-dynamic-precision-mean");
110 mDynamicPrecisionSigma = ic.options().get<float>("cmv-dynamic-precision-sigma");
111 mThreads = std::max(1, ic.options().get<int>("nthreads-compression"));
112 LOGP(info, "CMV aggregation settings: output-dir={}, use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}, nthreads-compression={}",
113 mOutputDir, mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma, mThreads);
114 initIntervalTree();
115 }
116
117 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
118 {
120 }
121
123 {
124 // Consume CCDB inputs; return early when they are the only valid inputs in this slot
125 int nCCDBInputs = 0;
126 if (pc.inputs().isValid("grpecs")) {
127 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
128 ++nCCDBInputs;
129 }
130 if (mUsePreciseTimestamp && pc.inputs().isValid("orbitreset")) {
131 mTFInfo = pc.inputs().get<dataformats::Pair<long, int>>("orbitreset");
132 ++nCCDBInputs;
133 }
134 if (nCCDBInputs > 0 && pc.inputs().countValidInputs() == nCCDBInputs) {
135 return;
136 }
137
138 if (mSetDataTakingCont) {
139 mDataTakingContext = pc.services().get<DataTakingContext>();
140 mSetDataTakingCont = false;
141 }
142
143 if (!mRun) {
145 }
146
147 const auto currTF = processing_helpers::getCurrentTF(pc);
148
149 if (mTFFirst == -1) {
150 for (auto& ref : InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
151 mTFFirst = pc.inputs().get<long>(ref);
152 mIntervalFirstTF = mTFFirst;
153 mHasIntervalFirstTF = true;
154 break;
155 }
156 }
157
158 // EOS sentinel forwarded by the distribute lane for partial batches (n-TFs-buffer > actual TFs delivered)
159 if (currTF == std::numeric_limits<uint32_t>::max()) {
160 if (mTimestampStart == 0) {
161 mTimestampStart = pc.services().get<o2::framework::TimingInfo>().creation;
162 }
163 collectEOSInputs(pc);
164 return;
165 }
166
167 if (mTFFirst == -1) {
168 mTFFirst = currTF;
169 mIntervalFirstTF = mTFFirst;
170 mHasIntervalFirstTF = true;
171 LOGP(warning, "firstTF not found. Setting {} as first TF for aggregate lane {}", mTFFirst, mLaneId);
172 }
173
174 const long relTF = (currTF - mTFFirst) / mNTFsBuffer;
175 if (relTF < 0) {
176 LOGP(warning, "relTF={} < 0 for TF {}, skipping", relTF, currTF);
177 return;
178 }
179 if (relTF >= static_cast<long>(mTimeFrames)) {
180 // The distribute has advanced past this interval (empty CRU placeholders sent by checkMissingData
181 // arrive with the triggering TF's context, not the missing batch's context).
182 // Force-complete whatever was buffered so the next TF starts a fresh interval.
183 LOGP(warning, "relTF={} out of range [0, {}) for TF {}: force-completing stale interval and resetting", relTF, mTimeFrames, currTF);
184 if (mTimestampStart == 0) {
185 mTimestampStart = static_cast<long>(pc.services().get<o2::framework::TimingInfo>().creation);
186 }
187 materializeBufferedTFs(true);
188 sendOutput(pc.outputs());
189 // Advance mTFFirst to the interval containing currTF so that after reset() clears it to -1
190 // we can restore a valid value. Without this, the distribute won't resend CMVFIRSTTF (it was
191 // already sent for the current interval), causing "firstTF not found" and further bad relTFs.
192 long nextFirst = mIntervalFirstTF + static_cast<long>(mTimeFrames) * mNTFsBuffer;
193 while (static_cast<long>(currTF) >= nextFirst + static_cast<long>(mTimeFrames) * mNTFsBuffer) {
194 nextFirst += static_cast<long>(mTimeFrames) * mNTFsBuffer;
195 }
196 reset();
197 mTFFirst = nextFirst;
198 mIntervalFirstTF = nextFirst;
199 mHasIntervalFirstTF = true;
200 return;
201 }
202
203 // Capture orbit info first so setTimestampCCDB can use the measured stride
204 if (!mOrbitInfoSeen[relTF]) {
205 // all CRUs within a batch carry identical timing, so the first one is sufficient
206 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
207 mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(ref);
208 const auto batchFirstOrbit = static_cast<uint32_t>(mOrbitInfo[relTF] >> 32);
209 // TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send).
210 // The FLP provides the orbit of the first real TF. Interpolating between the two gives the true stride,
211 // independent of the GRPECS/config nHBFPerTF value.
212 const auto batchLastOrbit = static_cast<uint32_t>(pc.services().get<o2::framework::TimingInfo>().firstTForbit);
213 const auto defaultOrbitStep = static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
214 mOrbitStep[relTF] = ((batchFirstOrbit > 0) && (mNTFsBuffer > 1) && (batchLastOrbit > batchFirstOrbit)) ? (batchLastOrbit - batchFirstOrbit) / static_cast<uint32_t>(mNTFsBuffer - 1) : defaultOrbitStep;
215 mLastOrbitStep = mOrbitStep[relTF];
216 mOrbitInfoSeen[relTF] = true;
217 break;
218 }
219 }
220
221 if (mTimestampStart == 0) {
222 setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
223 }
224
225 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
226 auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
227 const unsigned int cru = hdr->subSpecification;
228 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
229 LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru);
230 continue;
231 }
232 if (mProcessedCRUs[relTF][cru]) {
233 continue;
234 }
235
236 auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
237 mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
238 mProcessedCRUs[relTF][cru] = true;
239 ++mProcessedCRU[relTF];
240 }
241
242 if (mProcessedCRU[relTF] == mCRUs.size() && !mTFCompleted[relTF]) {
243 mTFCompleted[relTF] = true;
244 ++mProcessedTFs;
245 mLastSeenTF = currTF;
246 }
247
248 if (mProcessedTFs == mTimeFrames) {
249 materializeBufferedTFs(false);
250 sendOutput(pc.outputs());
251 reset();
252 }
253 }
254
256 {
257 materializeBufferedTFs(true);
258 materializeEOSBuffer();
259 sendOutput(ec.outputs());
260 ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
261 }
262
264
265 private:
266 struct PreparedTF {
267 CMVPerTF tf{};
268 CMVPerTFCompressed compressed{};
269 };
270
271 const int mLaneId{0};
272 std::vector<uint32_t> mCRUs{};
273 const unsigned int mTimeFrames{};
274 const bool mSendCCDB{false};
275 const bool mUsePreciseTimestamp{false};
276 const int mNTFsBuffer{1};
277 std::string mOutputDir{};
278 std::string mMetaFileDir{};
279 o2::framework::DataTakingContext mDataTakingContext{};
280 bool mSetDataTakingCont{true};
281 bool mUseCompressionVarint{false};
282 bool mUseSparse{false};
283 bool mUseCompressionHuffman{false};
284 uint16_t mRoundIntegersThreshold{0};
285 float mZeroThreshold{0.f};
286 float mDynamicPrecisionMean{1.f};
287 float mDynamicPrecisionSigma{0.f};
288 int mThreads{1};
289 long mTFFirst{-1};
290 long mTimestampStart{0};
291 long mIntervalFirstTF{0};
292 bool mHasIntervalFirstTF{false};
293 unsigned int mProcessedTFs{0};
294 std::vector<unsigned int> mProcessedCRU{};
295 std::vector<std::unordered_map<unsigned int, bool>> mProcessedCRUs{};
296 std::vector<std::unordered_map<uint32_t, std::vector<uint16_t>>> mRawCMVs{};
297 std::vector<uint64_t> mOrbitInfo{};
298 std::vector<uint32_t> mOrbitStep{};
299 std::vector<bool> mOrbitInfoSeen{};
300 std::vector<bool> mTFCompleted{};
301 std::unordered_map<uint32_t, std::vector<uint16_t>> mEOSRawCMVs{};
302 uint32_t mEOSFirstOrbit{0};
303 uint16_t mEOSFirstBC{0};
304 uint32_t mLastOrbitStep{0};
305 uint32_t mLastSeenTF{0};
306 unsigned int mIntervalTFCount{0};
307 uint64_t mRun{0};
308 uint32_t mIntervalFirstOrbit{0};
309 uint32_t mIntervalLastOrbit{0};
310 uint32_t mFirstOrbitDPL{0};
311 bool mIntervalOrbitSet{false};
313 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
314 std::unique_ptr<TTree> mIntervalTree{};
315 CMVPerTF mCurrentTF{};
316 CMVPerTFCompressed mCurrentCompressedTF{};
317 const std::vector<InputSpec> mFilter{
318 {"cmvagg",
320 Lifetime::Sporadic}};
321 const std::vector<InputSpec> mOrbitFilter{
322 {"cmvorbit",
324 Lifetime::Sporadic}};
325 const std::vector<InputSpec> mFirstTFFilter{
326 {"firstTF",
328 Lifetime::Sporadic}};
329
330 uint8_t buildCompressionFlags() const
331 {
333 if (mUseSparse) {
335 }
336 if (mUseCompressionHuffman) {
338 } else if (mUseCompressionVarint) {
340 }
341 return flags;
342 }
343
346 void initIntervalTree()
347 {
348 mIntervalTree = std::make_unique<TTree>("ccdb_object", "ccdb_object");
349 mIntervalTree->SetAutoSave(0);
350 mIntervalTree->SetDirectory(nullptr);
351 if (buildCompressionFlags() != CMVEncoding::kNone) {
352 mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF);
353 } else {
354 mIntervalTree->Branch("CMVPerTF", &mCurrentTF);
355 }
356 }
357
360 void collectEOSInputs(o2::framework::ProcessingContext& pc)
361 {
362 if (mEOSFirstOrbit == 0) {
363 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
364 const auto orbitBC = pc.inputs().get<uint64_t>(ref);
365 mEOSFirstOrbit = static_cast<uint32_t>(orbitBC >> 32);
366 mEOSFirstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
367 break;
368 }
369 }
370
371 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
372 auto const* hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
373 const unsigned int cru = hdr->subSpecification;
374 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
375 continue;
376 }
377 auto cmvVec = pc.inputs().get<pmr::vector<uint16_t>>(ref);
378 auto& buffer = mEOSRawCMVs[cru];
379 buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end());
380 }
381 }
382
386 void setTimestampCCDB(const long relTF, const uint32_t orbitStep, o2::framework::ProcessingContext& pc)
387 {
388 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
389 if (mUsePreciseTimestamp && !mTFInfo.second) {
390 // Orbit-reset info (NHBFPerTF) not yet received from the distribute lane.
391 // Fall back to DPL wall-clock creation time so mTimestampStart is never
392 // left at 0, which would cause successive intervals to overwrite each other.
393 mTimestampStart = tinfo.creation;
394 LOGP(warning, "Orbit reset info not yet received; using DPL creation time {} ms as fallback timestamp for interval starting at TF {}", mTimestampStart, mTFFirst);
395 return;
396 }
397 // prefer the measured stride; fall back to NHBFPerTF from GRPECS
398 const int nHBFPerTF = (orbitStep > 0) ? static_cast<int>(orbitStep) : o2::base::GRPGeomHelper::instance().getNHBFPerTF();
399 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * nHBFPerTF;
400 mFirstOrbitDPL = tinfo.firstTForbit - nOrbitsOffset;
401 mTimestampStart = mUsePreciseTimestamp ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) : tinfo.creation;
402 LOGP(info, "Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
403 mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, nHBFPerTF, relTF, nOrbitsOffset);
404 }
405
409 void materializeBufferedTFs(const bool includeIncomplete)
410 {
411 for (unsigned int relTF = 0; relTF < mTimeFrames; ++relTF) {
412 if (mProcessedCRU[relTF] == 0) {
413 continue;
414 }
415
416 if ((mProcessedCRU[relTF] != mCRUs.size()) && !includeIncomplete) {
417 continue;
418 }
419
420 if ((mProcessedCRU[relTF] != mCRUs.size()) && includeIncomplete) {
421 LOGP(warning, "Aggregate lane {} flushing incomplete CMV batch relTF {} at EOS: received {} CRUs out of {}", mLaneId, relTF, mProcessedCRU[relTF], mCRUs.size());
422 }
423
424 if (!mHasIntervalFirstTF) {
425 mIntervalFirstTF = mTFFirst == -1 ? 0 : mTFFirst;
426 mHasIntervalFirstTF = true;
427 }
428
429 // derive the actual number of sub-TFs from the buffer size; fall back to mNTFsBuffer if empty
430 const auto maxBufferSize = getMaxBufferSize(mRawCMVs[relTF]);
431 const int nTFsInBatch = maxBufferSize ? std::max(1, static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF)) : mNTFsBuffer;
432 // fall back to GRP NHBFPerTF only if no orbit stride was measured for this relTF
433 const auto orbitStep = mOrbitStep[relTF] ? mOrbitStep[relTF] : static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
434 appendBatchToTree(mRawCMVs[relTF], mOrbitInfo[relTF], orbitStep, nTFsInBatch);
435 }
436 }
437
441 void materializeEOSBuffer()
442 {
443 if (mEOSRawCMVs.empty()) {
444 return;
445 }
446
447 const auto maxBufferSize = getMaxBufferSize(mEOSRawCMVs);
448 const int nTFsInBatch = static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF);
449 if (nTFsInBatch <= 0) {
450 return;
451 }
452
453 if (!mHasIntervalFirstTF) {
454 mIntervalFirstTF = mLastSeenTF + 1;
455 mHasIntervalFirstTF = true;
456 }
457
458 const uint64_t orbitInfo = (static_cast<uint64_t>(mEOSFirstOrbit) << 32) | static_cast<uint64_t>(mEOSFirstBC);
459 // use the actual stride seen in run(); fall back to GRP only if no complete batch was seen
460 const auto orbitStep = mLastOrbitStep ? mLastOrbitStep : static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
461 appendBatchToTree(mEOSRawCMVs, orbitInfo, orbitStep, nTFsInBatch);
462 mLastSeenTF += static_cast<uint32_t>(nTFsInBatch);
463 }
464
465 static size_t getMaxBufferSize(const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs)
466 {
467 size_t maxBufferSize = 0;
468 for (const auto& [cru, values] : rawCMVs) {
469 maxBufferSize = std::max(maxBufferSize, values.size());
470 }
471 return maxBufferSize;
472 }
473
477 void appendBatchToTree(const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs, const uint64_t orbitInfo, const uint32_t orbitStep, const int nTFsInBatch)
478 {
479 if (nTFsInBatch <= 0) {
480 return;
481 }
482
483 const auto firstOrbit = static_cast<uint32_t>(orbitInfo >> 32);
484 const auto firstBC = static_cast<uint16_t>(orbitInfo & 0xFFFFu);
485 // Use the DPL-derived orbit as fallback when the FLP orbit info is missing (firstOrbit == 0)
486 const auto batchFirstOrbitDPL = (firstOrbit > 0) ? firstOrbit : mFirstOrbitDPL;
487 if (!mIntervalOrbitSet) {
488 mIntervalFirstOrbit = batchFirstOrbitDPL;
489 mIntervalOrbitSet = true;
490 }
491 mIntervalLastOrbit = batchFirstOrbitDPL + static_cast<uint32_t>(nTFsInBatch - 1) * orbitStep;
492 const uint8_t flags = buildCompressionFlags();
493 std::vector<PreparedTF> prepared(nTFsInBatch);
494 const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch));
495 const int chunkSize = (nTFsInBatch + nThreads - 1) / nThreads;
496
497 auto worker = [&](const int iThread) {
498 const int beginTF = iThread * chunkSize;
499 const int endTF = std::min(nTFsInBatch, beginTF + chunkSize);
500 for (int tfIndex = beginTF; tfIndex < endTF; ++tfIndex) {
501
502 auto& preparedTF = prepared[tfIndex];
503 preparedTF.tf.firstOrbit = firstOrbit + static_cast<uint32_t>(tfIndex) * orbitStep;
504 preparedTF.tf.firstOrbitDPL = batchFirstOrbitDPL + static_cast<uint32_t>(tfIndex) * orbitStep;
505
506 for (const auto& [cru, values] : rawCMVs) {
507 const uint32_t offset = static_cast<uint32_t>(tfIndex) * cmv::NTimeBinsPerTF;
508 if (offset >= static_cast<uint32_t>(values.size())) {
509 continue;
510 }
511 const uint32_t nBins = std::min(static_cast<uint32_t>(values.size()) - offset, cmv::NTimeBinsPerTF);
512 for (uint32_t tb = 0; tb < nBins; ++tb) {
513 preparedTF.tf.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = values[offset + tb];
514 }
515 }
516
517 preparedTF.tf.roundToIntegers(mRoundIntegersThreshold);
518 if (mZeroThreshold > 0.f) {
519 preparedTF.tf.zeroSmallValues(mZeroThreshold);
520 }
521 if (mDynamicPrecisionSigma > 0.f) {
522 preparedTF.tf.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
523 }
524 if (flags != CMVEncoding::kNone) {
525 preparedTF.compressed = preparedTF.tf.compress(flags);
526 }
527 }
528 };
529
530 std::vector<std::thread> workers;
531 workers.reserve(nThreads - 1);
532 for (int iThread = 1; iThread < nThreads; ++iThread) {
533 workers.emplace_back(worker, iThread);
534 }
535 worker(0);
536 for (auto& thread : workers) {
537 thread.join();
538 }
539
540 for (int tfIndex = 0; tfIndex < nTFsInBatch; ++tfIndex) {
541 if (flags != CMVEncoding::kNone) {
542 mCurrentCompressedTF = std::move(prepared[tfIndex].compressed);
543 } else {
544 mCurrentTF = std::move(prepared[tfIndex].tf);
545 }
546 mIntervalTree->Fill();
547 ++mIntervalTFCount;
548 }
549 }
550
551 void sendOutput(DataAllocator& output)
552 {
553 using timer = std::chrono::high_resolution_clock;
554
555 if (mIntervalTFCount == 0) {
556 LOGP(warning, "CMV interval is empty at sendOutput for lane {}, skipping", mLaneId);
557 return;
558 }
559
560 const auto lastTF = mIntervalFirstTF + static_cast<long>(mIntervalTFCount) - 1;
561 mIntervalTree->GetUserInfo()->Clear();
562 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("firstTF", mIntervalFirstTF));
563 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("lastTF", lastTF));
564
565 LOGP(info, "CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF);
566 auto start = timer::now();
567
568 const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
569 const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
570
571 if (mOutputDir != "/dev/null") {
572 const std::string calibFName = fmt::format("CMV_run_{}_orbit_{}_{}_timestamp_{}_{}.root",
573 mRun, mIntervalFirstOrbit, mIntervalLastOrbit, mTimestampStart, timeStampEnd);
574 try {
575 CMVPerTF::writeToFile(mOutputDir + calibFName, mIntervalTree);
576 LOGP(info, "CMV file written to {}", mOutputDir + calibFName);
577 } catch (const std::exception& e) {
578 LOGP(error, "Failed to write CMV file {}: {}", mOutputDir + calibFName, e.what());
579 }
580
581 if (mMetaFileDir != "/dev/null") {
583 calMetaData.fillFileData(mOutputDir + calibFName);
584 calMetaData.setDataTakingContext(mDataTakingContext);
585 calMetaData.type = "calib";
586 calMetaData.priority = "low";
587 auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
588 auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
589 try {
590 std::ofstream metaFileOut(metaFileNameTmp);
591 metaFileOut << calMetaData;
592 metaFileOut.close();
593 std::filesystem::rename(metaFileNameTmp, metaFileName);
594 } catch (std::exception const& e) {
595 LOG(error) << "Failed to store CMV meta data file " << metaFileName << ", reason: " << e.what();
596 }
597 }
598 }
599
600 if ((!mSendCCDB) && (mOutputDir == "/dev/null")) {
601 LOGP(warning, "Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
602 }
603 if (!mSendCCDB) {
604 return;
605 }
606
607 if (timeStampEnd <= mTimestampStart) {
608 LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd);
609 return;
610 }
611
612 o2::ccdb::CcdbObjectInfo ccdbInfoCMV("TPC/Calib/CMV", "TTree", "CMV.root", {}, mTimestampStart, timeStampEnd);
613 auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV);
614 // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end
615 {
616 TMemFile mf("trim", image->data(), static_cast<Long64_t>(image->size()), "READ");
617 image->resize(static_cast<size_t>(mf.GetEND()));
618 mf.Close();
619 }
620
621 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
624
625 auto stop = timer::now();
626 std::chrono::duration<float> elapsed = stop - start;
627 LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count());
628 }
629
631 void reset()
632 {
633 mTFFirst = -1;
634 mTimestampStart = 0;
635 mIntervalFirstTF = 0;
636 mHasIntervalFirstTF = false;
637 mProcessedTFs = 0;
638 std::fill(mProcessedCRU.begin(), mProcessedCRU.end(), 0);
639 std::fill(mOrbitInfo.begin(), mOrbitInfo.end(), 0);
640 std::fill(mOrbitStep.begin(), mOrbitStep.end(), 0);
641 std::fill(mOrbitInfoSeen.begin(), mOrbitInfoSeen.end(), false);
642 std::fill(mTFCompleted.begin(), mTFCompleted.end(), false);
643 for (auto& processedMap : mProcessedCRUs) {
644 for (auto& [cru, seen] : processedMap) {
645 seen = false;
646 }
647 }
648 for (auto& rawPerTF : mRawCMVs) {
649 rawPerTF.clear();
650 }
651 mEOSRawCMVs.clear();
652 mEOSFirstOrbit = 0;
653 mEOSFirstBC = 0;
654 mLastOrbitStep = 0;
655 mLastSeenTF = 0;
656 mIntervalTFCount = 0;
657 mIntervalFirstOrbit = 0;
658 mIntervalLastOrbit = 0;
659 mFirstOrbitDPL = 0;
660 mIntervalOrbitSet = false;
661 mCurrentTF = CMVPerTF{};
662 mCurrentCompressedTF = CMVPerTFCompressed{};
663 initIntervalTree();
664 }
665};
666
670 const std::vector<uint32_t>& crus,
671 const unsigned int timeframes,
672 const bool sendCCDB,
673 const bool usePreciseTimestamp,
674 const int nTFsBuffer = 1)
675{
676 std::vector<OutputSpec> outputSpecs;
677 if (sendCCDB) {
680 }
681
682 std::vector<InputSpec> inputSpecs;
683 inputSpecs.emplace_back(InputSpec{"cmvagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic});
684 inputSpecs.emplace_back(InputSpec{"cmvorbit", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
685 inputSpecs.emplace_back(InputSpec{"firstTF", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
686 if (usePreciseTimestamp) {
687 inputSpecs.emplace_back(InputSpec{"orbitreset", gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, Lifetime::Sporadic});
688 }
689
690 // Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process
691 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
692 true, // GRPECS (NHBFPerTF)
693 false, // GRPLHCIF
694 false, // GRPMagField
695 false, // askMatLUT
697 inputSpecs);
698
700 fmt::format("tpc-aggregate-cmv-{:02}", lane).data(),
701 inputSpecs,
702 outputSpecs,
703 AlgorithmSpec{adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
704 Options{{"output-dir", VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
705 {"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
706 {"nthreads-compression", VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
707 {"use-sparse", VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
708 {"use-compression-varint", VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
709 {"use-compression-huffman", VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
710 {"cmv-zero-threshold", VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
711 {"cmv-round-integers-threshold", VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
712 {"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
713 {"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
714 spec.rank = lane;
715 return spec;
716}
717
718} // namespace o2::tpc
719
720#endif
Structs for storing CMVs to the CCDB.
Common mode values data format definition.
Utils and constants for calibration and related workflows.
std::ostringstream debug
TPCZSHDR * hdr
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
TPC distribution of grouped CMVs towards the CMV aggregation workflow.
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void stop()
This is invoked on stop.
Definition Task.h:53
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
TPCAggregateCMVDevice(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const bool sendCCDB, const bool usePreciseTimestamp, const int nTFsBuffer, std::shared_ptr< o2::base::GRPGeomRequest > req)
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
static constexpr header::DataDescription getDataDescriptionCCDBCMV()
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVFirstTF()
static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
Return data description for aggregated CMVs for a given lane.
static constexpr header::DataDescription getDataDescriptionCMVOrbitReset()
static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane)
Return data description for orbit/BC info for a given output lane.
GLeglImageOES image
Definition glcorearb.h:4021
GLuint buffer
Definition glcorearb.h:655
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLintptr offset
Definition glcorearb.h:660
GLbitfield flags
Definition glcorearb.h:1570
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
uint8_t itsSharedClusterMap uint8_t
constexpr double LHCOrbitMUS
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::tuple< uint32_t, uint16_t > orbitBC(uint20_t bunchCrossing, uint32_t firstOrbit)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
DataProcessorSpec getTPCAggregateCMVSpec(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const bool sendCCDB, const bool usePreciseTimestamp, const int nTFsBuffer=1)
std::unique_ptr< GPUReconstructionTimeframe > tf
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr uint8_t kDelta
Delta coding between consecutive values (dense only)
static constexpr uint8_t kVarint
Varint compression of the value stream.
static constexpr uint8_t kHuffman
Canonical Huffman compression of the value stream.
static constexpr uint8_t kNone
No compression — raw uint16 values stored flat.
static constexpr uint8_t kSparse
Non-zero positions stored sparsely (varint-encoded deltas)
static constexpr uint8_t kZigzag
Zigzag encoding of deltas or signed values.
static void writeToFile(const std::string &filename, const std::unique_ptr< TTree > &tree)
Write the TTree to a ROOT file.
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"