Project
Loading...
Searching...
No Matches
TPCAggregateCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCAGGREGATECMVSPEC_H
17#define O2_TPCAGGREGATECMVSPEC_H
18
19#include <algorithm>
20#include <chrono>
21#include <limits>
22#include <thread>
23#include <unordered_map>
24#include <vector>
25#include <fmt/format.h>
26#include <filesystem>
27#include <fstream>
28#include "TMemFile.h"
29#include "TParameter.h"
30#include "Framework/Task.h"
32#include "Framework/Logger.h"
37#include "Headers/DataHeader.h"
40#include "CCDB/CcdbApi.h"
41#include "CCDB/CcdbObjectInfo.h"
46#include "DataFormatsTPC/CMV.h"
51
52namespace o2::tpc
53{
54
56{
57 public:
58 TPCAggregateCMVDevice(const int lane,
59 const std::vector<uint32_t>& crus,
60 const unsigned int timeframes,
61 const bool sendCCDB,
62 const bool usePreciseTimestamp,
63 const int nTFsBuffer,
64 std::shared_ptr<o2::base::GRPGeomRequest> req)
65 : mLaneId{lane},
66 mCRUs{crus},
67 mTimeFrames{timeframes},
68 mSendCCDB{sendCCDB},
69 mUsePreciseTimestamp{usePreciseTimestamp},
70 mNTFsBuffer{nTFsBuffer},
71 mProcessedCRU(timeframes),
72 mProcessedCRUs(timeframes),
73 mRawCMVs(timeframes),
74 mOrbitInfo(timeframes),
75 mOrbitStep(timeframes),
76 mOrbitInfoSeen(timeframes, false),
77 mTFCompleted(timeframes, false),
78 mCCDBRequest(req)
79 {
80 std::sort(mCRUs.begin(), mCRUs.end());
81 for (auto& crusMap : mProcessedCRUs) {
82 crusMap.reserve(mCRUs.size());
83 for (const auto cruID : mCRUs) {
84 crusMap.emplace(cruID, false);
85 }
86 }
87 initIntervalTree();
88 }
89
91 {
93 mOutputDir = ic.options().get<std::string>("output-dir");
94 if (mOutputDir != "/dev/null") {
95 mOutputDir = o2::utils::Str::rectifyDirectory(mOutputDir);
96 }
97 mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
98 if (mMetaFileDir != "/dev/null") {
99 mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
100 }
101 mUseCompressionVarint = ic.options().get<bool>("use-compression-varint");
102 mUseSparse = ic.options().get<bool>("use-sparse");
103 mUseCompressionHuffman = ic.options().get<bool>("use-compression-huffman");
104 mRoundIntegersThreshold = static_cast<uint16_t>(ic.options().get<int>("cmv-round-integers-threshold"));
105 mZeroThreshold = ic.options().get<float>("cmv-zero-threshold");
106 mDynamicPrecisionMean = ic.options().get<float>("cmv-dynamic-precision-mean");
107 mDynamicPrecisionSigma = ic.options().get<float>("cmv-dynamic-precision-sigma");
108 mThreads = std::max(1, ic.options().get<int>("nthreads-compression"));
109 LOGP(info, "CMV aggregation settings: output-dir={}, use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}, nthreads-compression={}",
110 mOutputDir, mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma, mThreads);
111 initIntervalTree();
112 }
113
114 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
115 {
117 }
118
120 {
121 // Consume CCDB inputs; return early when they are the only valid inputs in this slot
122 int nCCDBInputs = 0;
123 if (pc.inputs().isValid("grpecs")) {
124 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
125 ++nCCDBInputs;
126 }
127 if (mUsePreciseTimestamp && pc.inputs().isValid("orbitreset")) {
128 mTFInfo = pc.inputs().get<dataformats::Pair<long, int>>("orbitreset");
129 ++nCCDBInputs;
130 }
131 if (nCCDBInputs > 0 && pc.inputs().countValidInputs() == nCCDBInputs) {
132 return;
133 }
134
135 if (mSetDataTakingCont) {
136 mDataTakingContext = pc.services().get<o2::framework::DataTakingContext>();
137 mSetDataTakingCont = false;
138 }
139
140 if (!mRun) {
142 }
143
144 const auto currTF = processing_helpers::getCurrentTF(pc);
145
146 if (mTFFirst == -1) {
147 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFirstTFFilter)) {
148 mTFFirst = pc.inputs().get<long>(ref);
149 mIntervalFirstTF = mTFFirst;
150 mHasIntervalFirstTF = true;
151 break;
152 }
153 }
154
155 // EOS sentinel forwarded by the distribute lane for partial batches (n-TFs-buffer > actual TFs delivered)
156 if (currTF == std::numeric_limits<uint32_t>::max()) {
157 if (mTimestampStart == 0) {
158 mTimestampStart = pc.services().get<o2::framework::TimingInfo>().creation;
159 }
160 collectEOSInputs(pc);
161 return;
162 }
163
164 if (mTFFirst == -1) {
165 mTFFirst = currTF;
166 mIntervalFirstTF = mTFFirst;
167 mHasIntervalFirstTF = true;
168 LOGP(warning, "firstTF not found. Setting {} as first TF for aggregate lane {}", mTFFirst, mLaneId);
169 }
170
171 const long relTF = (currTF - mTFFirst) / mNTFsBuffer;
172 if (relTF < 0) {
173 LOGP(warning, "relTF={} < 0 for TF {}, skipping", relTF, currTF);
174 return;
175 }
176 if (relTF >= static_cast<long>(mTimeFrames)) {
177 // The distribute has advanced past this interval (empty CRU placeholders sent by checkMissingData
178 // arrive with the triggering TF's context, not the missing batch's context).
179 // Force-complete whatever was buffered so the next TF starts a fresh interval.
180 LOGP(warning, "relTF={} out of range [0, {}) for TF {}: force-completing stale interval and resetting", relTF, mTimeFrames, currTF);
181 if (mTimestampStart == 0) {
182 mTimestampStart = static_cast<long>(pc.services().get<o2::framework::TimingInfo>().creation);
183 }
184 materializeBufferedTFs(true);
185 sendOutput(pc.outputs());
186 // Advance mTFFirst to the interval containing currTF so that after reset() clears it to -1
187 // we can restore a valid value. Without this, the distribute won't resend CMVFIRSTTF (it was
188 // already sent for the current interval), causing "firstTF not found" and further bad relTFs.
189 long nextFirst = mIntervalFirstTF + static_cast<long>(mTimeFrames) * mNTFsBuffer;
190 while (static_cast<long>(currTF) >= nextFirst + static_cast<long>(mTimeFrames) * mNTFsBuffer) {
191 nextFirst += static_cast<long>(mTimeFrames) * mNTFsBuffer;
192 }
193 reset();
194 mTFFirst = nextFirst;
195 mIntervalFirstTF = nextFirst;
196 mHasIntervalFirstTF = true;
197 return;
198 }
199
200 // Capture orbit info first so setTimestampCCDB can use the measured stride
201 if (!mOrbitInfoSeen[relTF]) {
202 // all CRUs within a batch carry identical timing, so the first one is sufficient
203 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
204 mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(ref);
205 const auto batchFirstOrbit = static_cast<uint32_t>(mOrbitInfo[relTF] >> 32);
206 // TimingInfo.firstTForbit is the orbit of the last real TF in the batch (the TF that triggered the FLP to send).
207 // The FLP provides the orbit of the first real TF. Interpolating between the two gives the true stride,
208 // independent of the GRPECS/config nHBFPerTF value.
209 const auto batchLastOrbit = static_cast<uint32_t>(pc.services().get<o2::framework::TimingInfo>().firstTForbit);
210 const auto defaultOrbitStep = static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
211 mOrbitStep[relTF] = ((batchFirstOrbit > 0) && (mNTFsBuffer > 1) && (batchLastOrbit > batchFirstOrbit)) ? (batchLastOrbit - batchFirstOrbit) / static_cast<uint32_t>(mNTFsBuffer - 1) : defaultOrbitStep;
212 mLastOrbitStep = mOrbitStep[relTF];
213 mOrbitInfoSeen[relTF] = true;
214 break;
215 }
216 }
217
218 if (mTimestampStart == 0) {
219 setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
220 }
221
222 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
223 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
224 const unsigned int cru = hdr->subSpecification;
225 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
226 LOGP(debug, "Received CMV data from CRU {} which is not part of this aggregate lane", cru);
227 continue;
228 }
229 if (mProcessedCRUs[relTF][cru]) {
230 continue;
231 }
232
233 auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
234 mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
235 mProcessedCRUs[relTF][cru] = true;
236 ++mProcessedCRU[relTF];
237 }
238
239 if (mProcessedCRU[relTF] == mCRUs.size() && !mTFCompleted[relTF]) {
240 mTFCompleted[relTF] = true;
241 ++mProcessedTFs;
242 mLastSeenTF = currTF;
243 }
244
245 if (mProcessedTFs == mTimeFrames) {
246 materializeBufferedTFs(false);
247 sendOutput(pc.outputs());
248 reset();
249 }
250 }
251
253 {
254 materializeBufferedTFs(true);
255 materializeEOSBuffer();
256 sendOutput(ec.outputs());
257 ec.services().get<o2::framework::ControlService>().readyToQuit(o2::framework::QuitRequest::Me);
258 }
259
261
262 private:
263 struct PreparedTF {
264 CMVPerTF tf{};
265 CMVPerTFCompressed compressed{};
266 };
267
268 const int mLaneId{0};
269 std::vector<uint32_t> mCRUs{};
270 const unsigned int mTimeFrames{};
271 const bool mSendCCDB{false};
272 const bool mUsePreciseTimestamp{false};
273 const int mNTFsBuffer{1};
274 std::string mOutputDir{};
275 std::string mMetaFileDir{};
276 o2::framework::DataTakingContext mDataTakingContext{};
277 bool mSetDataTakingCont{true};
278 bool mUseCompressionVarint{false};
279 bool mUseSparse{false};
280 bool mUseCompressionHuffman{false};
281 uint16_t mRoundIntegersThreshold{0};
282 float mZeroThreshold{0.f};
283 float mDynamicPrecisionMean{1.f};
284 float mDynamicPrecisionSigma{0.f};
285 int mThreads{1};
286 long mTFFirst{-1};
287 long mTimestampStart{0};
288 long mIntervalFirstTF{0};
289 bool mHasIntervalFirstTF{false};
290 unsigned int mProcessedTFs{0};
291 std::vector<unsigned int> mProcessedCRU{};
292 std::vector<std::unordered_map<unsigned int, bool>> mProcessedCRUs{};
293 std::vector<std::unordered_map<uint32_t, std::vector<uint16_t>>> mRawCMVs{};
294 std::vector<uint64_t> mOrbitInfo{};
295 std::vector<uint32_t> mOrbitStep{};
296 std::vector<bool> mOrbitInfoSeen{};
297 std::vector<bool> mTFCompleted{};
298 std::unordered_map<uint32_t, std::vector<uint16_t>> mEOSRawCMVs{};
299 uint32_t mEOSFirstOrbit{0};
300 uint16_t mEOSFirstBC{0};
301 uint32_t mLastOrbitStep{0};
302 uint32_t mLastSeenTF{0};
303 unsigned int mIntervalTFCount{0};
304 uint64_t mRun{0};
305 uint32_t mIntervalFirstOrbit{0};
306 uint32_t mIntervalLastOrbit{0};
307 uint32_t mFirstOrbitDPL{0};
308 bool mIntervalOrbitSet{false};
309 dataformats::Pair<long, int> mTFInfo{};
310 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
311 std::unique_ptr<TTree> mIntervalTree{};
312 CMVPerTF mCurrentTF{};
313 CMVPerTFCompressed mCurrentCompressedTF{};
314 const std::vector<o2::framework::InputSpec> mFilter{
315 {"cmvagg",
317 o2::framework::Lifetime::Sporadic}};
318 const std::vector<o2::framework::InputSpec> mOrbitFilter{
319 {"cmvorbit",
321 o2::framework::Lifetime::Sporadic}};
322 const std::vector<o2::framework::InputSpec> mFirstTFFilter{
323 {"firstTF",
325 o2::framework::Lifetime::Sporadic}};
326
327 uint8_t buildCompressionFlags() const
328 {
330 if (mUseSparse) {
332 }
333 if (mUseCompressionHuffman) {
335 } else if (mUseCompressionVarint) {
337 }
338 return flags;
339 }
340
343 void initIntervalTree()
344 {
345 mIntervalTree = std::make_unique<TTree>("ccdb_object", "ccdb_object");
346 mIntervalTree->SetAutoSave(0);
347 mIntervalTree->SetDirectory(nullptr);
348 if (buildCompressionFlags() != CMVEncoding::kNone) {
349 mIntervalTree->Branch("CMVPerTFCompressed", &mCurrentCompressedTF);
350 } else {
351 mIntervalTree->Branch("CMVPerTF", &mCurrentTF);
352 }
353 }
354
357 void collectEOSInputs(o2::framework::ProcessingContext& pc)
358 {
359 if (mEOSFirstOrbit == 0) {
360 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
361 const auto orbitBC = pc.inputs().get<uint64_t>(ref);
362 mEOSFirstOrbit = static_cast<uint32_t>(orbitBC >> 32);
363 mEOSFirstBC = static_cast<uint16_t>(orbitBC & 0xFFFFu);
364 break;
365 }
366 }
367
368 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
369 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
370 const unsigned int cru = hdr->subSpecification;
371 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
372 continue;
373 }
374 auto cmvVec = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
375 auto& buffer = mEOSRawCMVs[cru];
376 buffer.insert(buffer.end(), cmvVec.begin(), cmvVec.end());
377 }
378 }
379
383 void setTimestampCCDB(const long relTF, const uint32_t orbitStep, o2::framework::ProcessingContext& pc)
384 {
385 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
386 if (mUsePreciseTimestamp && !mTFInfo.second) {
387 // Orbit-reset info (NHBFPerTF) not yet received from the distribute lane.
388 // Fall back to DPL wall-clock creation time so mTimestampStart is never
389 // left at 0, which would cause successive intervals to overwrite each other.
390 mTimestampStart = tinfo.creation;
391 LOGP(warning, "Orbit reset info not yet received; using DPL creation time {} ms as fallback timestamp for interval starting at TF {}", mTimestampStart, mTFFirst);
392 return;
393 }
394 // prefer the measured stride; fall back to NHBFPerTF from GRPECS
395 const int nHBFPerTF = (orbitStep > 0) ? static_cast<int>(orbitStep) : o2::base::GRPGeomHelper::instance().getNHBFPerTF();
396 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * nHBFPerTF;
397 mFirstOrbitDPL = tinfo.firstTForbit - nOrbitsOffset;
398 mTimestampStart = mUsePreciseTimestamp ? (mTFInfo.first + (tinfo.firstTForbit - nOrbitsOffset) * o2::constants::lhc::LHCOrbitMUS * 0.001) : tinfo.creation;
399 LOGP(info, "Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
400 mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, nHBFPerTF, relTF, nOrbitsOffset);
401 }
402
406 void materializeBufferedTFs(const bool includeIncomplete)
407 {
408 for (unsigned int relTF = 0; relTF < mTimeFrames; ++relTF) {
409 if (mProcessedCRU[relTF] == 0) {
410 continue;
411 }
412
413 if ((mProcessedCRU[relTF] != mCRUs.size()) && !includeIncomplete) {
414 continue;
415 }
416
417 if ((mProcessedCRU[relTF] != mCRUs.size()) && includeIncomplete) {
418 LOGP(warning, "Aggregate lane {} flushing incomplete CMV batch relTF {} at EOS: received {} CRUs out of {}", mLaneId, relTF, mProcessedCRU[relTF], mCRUs.size());
419 }
420
421 if (!mHasIntervalFirstTF) {
422 mIntervalFirstTF = mTFFirst == -1 ? 0 : mTFFirst;
423 mHasIntervalFirstTF = true;
424 }
425
426 // derive the actual number of sub-TFs from the buffer size; fall back to mNTFsBuffer if empty
427 const auto maxBufferSize = getMaxBufferSize(mRawCMVs[relTF]);
428 const int nTFsInBatch = maxBufferSize ? std::max(1, static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF)) : mNTFsBuffer;
429 // fall back to GRP NHBFPerTF only if no orbit stride was measured for this relTF
430 const auto orbitStep = mOrbitStep[relTF] ? mOrbitStep[relTF] : static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
431 appendBatchToTree(mRawCMVs[relTF], mOrbitInfo[relTF], orbitStep, nTFsInBatch);
432 }
433 }
434
438 void materializeEOSBuffer()
439 {
440 if (mEOSRawCMVs.empty()) {
441 return;
442 }
443
444 const auto maxBufferSize = getMaxBufferSize(mEOSRawCMVs);
445 const int nTFsInBatch = static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF);
446 if (nTFsInBatch <= 0) {
447 return;
448 }
449
450 if (!mHasIntervalFirstTF) {
451 mIntervalFirstTF = mLastSeenTF + 1;
452 mHasIntervalFirstTF = true;
453 }
454
455 const uint64_t orbitInfo = (static_cast<uint64_t>(mEOSFirstOrbit) << 32) | static_cast<uint64_t>(mEOSFirstBC);
456 // use the actual stride seen in run(); fall back to GRP only if no complete batch was seen
457 const auto orbitStep = mLastOrbitStep ? mLastOrbitStep : static_cast<uint32_t>(o2::base::GRPGeomHelper::instance().getNHBFPerTF());
458 appendBatchToTree(mEOSRawCMVs, orbitInfo, orbitStep, nTFsInBatch);
459 mLastSeenTF += static_cast<uint32_t>(nTFsInBatch);
460 }
461
462 static size_t getMaxBufferSize(const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs)
463 {
464 size_t maxBufferSize = 0;
465 for (const auto& [cru, values] : rawCMVs) {
466 maxBufferSize = std::max(maxBufferSize, values.size());
467 }
468 return maxBufferSize;
469 }
470
474 void appendBatchToTree(const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs, const uint64_t orbitInfo, const uint32_t orbitStep, const int nTFsInBatch)
475 {
476 if (nTFsInBatch <= 0) {
477 return;
478 }
479
480 const auto firstOrbit = static_cast<uint32_t>(orbitInfo >> 32);
481 const auto firstBC = static_cast<uint16_t>(orbitInfo & 0xFFFFu);
482 // Use the DPL-derived orbit as fallback when the FLP orbit info is missing (firstOrbit == 0)
483 const auto batchFirstOrbitDPL = (firstOrbit > 0) ? firstOrbit : mFirstOrbitDPL;
484 if (!mIntervalOrbitSet) {
485 mIntervalFirstOrbit = batchFirstOrbitDPL;
486 mIntervalOrbitSet = true;
487 }
488 mIntervalLastOrbit = batchFirstOrbitDPL + static_cast<uint32_t>(nTFsInBatch - 1) * orbitStep;
489 const uint8_t flags = buildCompressionFlags();
490 std::vector<PreparedTF> prepared(nTFsInBatch);
491 const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch));
492 const int chunkSize = (nTFsInBatch + nThreads - 1) / nThreads;
493
494 auto worker = [&](const int iThread) {
495 const int beginTF = iThread * chunkSize;
496 const int endTF = std::min(nTFsInBatch, beginTF + chunkSize);
497 for (int tfIndex = beginTF; tfIndex < endTF; ++tfIndex) {
498
499 auto& preparedTF = prepared[tfIndex];
500 preparedTF.tf.firstOrbit = firstOrbit + static_cast<uint32_t>(tfIndex) * orbitStep;
501 preparedTF.tf.firstOrbitDPL = batchFirstOrbitDPL + static_cast<uint32_t>(tfIndex) * orbitStep;
502
503 for (const auto& [cru, values] : rawCMVs) {
504 const uint32_t offset = static_cast<uint32_t>(tfIndex) * cmv::NTimeBinsPerTF;
505 if (offset >= static_cast<uint32_t>(values.size())) {
506 continue;
507 }
508 const uint32_t nBins = std::min(static_cast<uint32_t>(values.size()) - offset, cmv::NTimeBinsPerTF);
509 for (uint32_t tb = 0; tb < nBins; ++tb) {
510 preparedTF.tf.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = values[offset + tb];
511 }
512 }
513
514 preparedTF.tf.roundToIntegers(mRoundIntegersThreshold);
515 if (mZeroThreshold > 0.f) {
516 preparedTF.tf.zeroSmallValues(mZeroThreshold);
517 }
518 if (mDynamicPrecisionSigma > 0.f) {
519 preparedTF.tf.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
520 }
521 if (flags != CMVEncoding::kNone) {
522 preparedTF.compressed = preparedTF.tf.compress(flags);
523 }
524 }
525 };
526
527 std::vector<std::thread> workers;
528 workers.reserve(nThreads - 1);
529 for (int iThread = 1; iThread < nThreads; ++iThread) {
530 workers.emplace_back(worker, iThread);
531 }
532 worker(0);
533 for (auto& thread : workers) {
534 thread.join();
535 }
536
537 for (int tfIndex = 0; tfIndex < nTFsInBatch; ++tfIndex) {
538 if (flags != CMVEncoding::kNone) {
539 mCurrentCompressedTF = std::move(prepared[tfIndex].compressed);
540 } else {
541 mCurrentTF = std::move(prepared[tfIndex].tf);
542 }
543 mIntervalTree->Fill();
544 ++mIntervalTFCount;
545 }
546 }
547
548 void sendOutput(o2::framework::DataAllocator& output)
549 {
550 using timer = std::chrono::high_resolution_clock;
551
552 if (mIntervalTFCount == 0) {
553 LOGP(warning, "CMV interval is empty at sendOutput for lane {}, skipping", mLaneId);
554 return;
555 }
556
557 const auto lastTF = mIntervalFirstTF + static_cast<long>(mIntervalTFCount) - 1;
558 mIntervalTree->GetUserInfo()->Clear();
559 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("firstTF", mIntervalFirstTF));
560 mIntervalTree->GetUserInfo()->Add(new TParameter<long>("lastTF", lastTF));
561
562 LOGP(info, "CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF);
563 auto start = timer::now();
564
565 const int nHBFPerTF = o2::base::GRPGeomHelper::instance().getNHBFPerTF();
566 const long timeStampEnd = mTimestampStart + static_cast<long>(mIntervalTFCount * nHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-3);
567
568 if (mOutputDir != "/dev/null") {
569 const std::string calibFName = fmt::format("CMV_run_{}_orbit_{}_{}_timestamp_{}_{}.root",
570 mRun, mIntervalFirstOrbit, mIntervalLastOrbit, mTimestampStart, timeStampEnd);
571 try {
572 CMVPerTF::writeToFile(mOutputDir + calibFName, mIntervalTree);
573 LOGP(info, "CMV file written to {}", mOutputDir + calibFName);
574 } catch (const std::exception& e) {
575 LOGP(error, "Failed to write CMV file {}: {}", mOutputDir + calibFName, e.what());
576 }
577
578 if (mMetaFileDir != "/dev/null") {
580 calMetaData.fillFileData(mOutputDir + calibFName);
581 calMetaData.setDataTakingContext(mDataTakingContext);
582 calMetaData.type = "calib";
583 calMetaData.priority = "low";
584 auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
585 auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
586 try {
587 std::ofstream metaFileOut(metaFileNameTmp);
588 metaFileOut << calMetaData;
589 metaFileOut.close();
590 std::filesystem::rename(metaFileNameTmp, metaFileName);
591 } catch (std::exception const& e) {
592 LOG(error) << "Failed to store CMV meta data file " << metaFileName << ", reason: " << e.what();
593 }
594 }
595 }
596
597 if ((!mSendCCDB) && (mOutputDir == "/dev/null")) {
598 LOGP(warning, "Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
599 }
600 if (!mSendCCDB) {
601 return;
602 }
603
604 if (timeStampEnd <= mTimestampStart) {
605 LOGP(warning, "Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd);
606 return;
607 }
608
609 o2::ccdb::CcdbObjectInfo ccdbInfoCMV("TPC/Calib/CMV", "TTree", "CMV.root", {}, mTimestampStart, timeStampEnd);
610 auto image = o2::ccdb::CcdbApi::createObjectImage((mIntervalTree.get()), &ccdbInfoCMV);
611 // trim TMemFile zero-padding: GetSize() is block-rounded, GetEND() is the actual file end
612 {
613 TMemFile mf("trim", image->data(), static_cast<Long64_t>(image->size()), "READ");
614 image->resize(static_cast<size_t>(mf.GetEND()));
615 mf.Close();
616 }
617
618 LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(), image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
621
622 auto stop = timer::now();
623 std::chrono::duration<float> elapsed = stop - start;
624 LOGP(info, "CMV CCDB serialisation time: {:.3f} s", elapsed.count());
625 }
626
628 void reset()
629 {
630 mTFFirst = -1;
631 mTimestampStart = 0;
632 mIntervalFirstTF = 0;
633 mHasIntervalFirstTF = false;
634 mProcessedTFs = 0;
635 std::fill(mProcessedCRU.begin(), mProcessedCRU.end(), 0);
636 std::fill(mOrbitInfo.begin(), mOrbitInfo.end(), 0);
637 std::fill(mOrbitStep.begin(), mOrbitStep.end(), 0);
638 std::fill(mOrbitInfoSeen.begin(), mOrbitInfoSeen.end(), false);
639 std::fill(mTFCompleted.begin(), mTFCompleted.end(), false);
640 for (auto& processedMap : mProcessedCRUs) {
641 for (auto& [cru, seen] : processedMap) {
642 seen = false;
643 }
644 }
645 for (auto& rawPerTF : mRawCMVs) {
646 rawPerTF.clear();
647 }
648 mEOSRawCMVs.clear();
649 mEOSFirstOrbit = 0;
650 mEOSFirstBC = 0;
651 mLastOrbitStep = 0;
652 mLastSeenTF = 0;
653 mIntervalTFCount = 0;
654 mIntervalFirstOrbit = 0;
655 mIntervalLastOrbit = 0;
656 mFirstOrbitDPL = 0;
657 mIntervalOrbitSet = false;
658 mCurrentTF = CMVPerTF{};
659 mCurrentCompressedTF = CMVPerTFCompressed{};
660 initIntervalTree();
661 }
662};
663
667 const std::vector<uint32_t>& crus,
668 const unsigned int timeframes,
669 const bool sendCCDB,
670 const bool usePreciseTimestamp,
671 const int nTFsBuffer = 1)
672{
673 std::vector<o2::framework::OutputSpec> outputSpecs;
674 if (sendCCDB) {
677 }
678
679 std::vector<o2::framework::InputSpec> inputSpecs;
680 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvagg", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic});
681 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
682 inputSpecs.emplace_back(o2::framework::InputSpec{"firstTF", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
683 if (usePreciseTimestamp) {
684 inputSpecs.emplace_back(o2::framework::InputSpec{"orbitreset", o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{static_cast<unsigned int>(lane)}, o2::framework::Lifetime::Sporadic});
685 }
686
687 // Request GRPECS from CCDB so that GRPGeomHelper::getNHBFPerTF() is valid in this (separate) process
688 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
689 true, // GRPECS (NHBFPerTF)
690 false, // GRPLHCIF
691 false, // GRPMagField
692 false, // askMatLUT
694 inputSpecs);
695
697 fmt::format("tpc-aggregate-cmv-{:02}", lane).data(),
698 inputSpecs,
699 outputSpecs,
700 o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCAggregateCMVDevice>(lane, crus, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer, ccdbRequest)},
701 o2::framework::Options{{"output-dir", o2::framework::VariantType::String, "/dev/null", {"CMV output directory, must exist (if not /dev/null)"}},
702 {"meta-output-dir", o2::framework::VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
703 {"nthreads-compression", o2::framework::VariantType::Int, 1, {"Number of threads used for CMV per timeframe preprocessing and compression"}},
704 {"use-sparse", o2::framework::VariantType::Bool, false, {"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
705 {"use-compression-varint", o2::framework::VariantType::Bool, false, {"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
706 {"use-compression-huffman", o2::framework::VariantType::Bool, false, {"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
707 {"cmv-zero-threshold", o2::framework::VariantType::Float, 0.f, {"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
708 {"cmv-round-integers-threshold", o2::framework::VariantType::Int, 0, {"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
709 {"cmv-dynamic-precision-mean", o2::framework::VariantType::Float, 1.f, {"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
710 {"cmv-dynamic-precision-sigma", o2::framework::VariantType::Float, 0.f, {"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};
711 spec.rank = lane;
712 return spec;
713}
714
715} // namespace o2::tpc
716
717#endif
Structs for storing CMVs to the CCDB.
Common mode values data format definition.
Utils and constants for calibration and related workflows.
std::ostringstream debug
TPCZSHDR * hdr
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
TPC distribution of grouped CMVs towards the CMV aggregation workflow.
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void stop()
This is invoked on stop.
Definition Task.h:53
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
TPCAggregateCMVDevice(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const bool sendCCDB, const bool usePreciseTimestamp, const int nTFsBuffer, std::shared_ptr< o2::base::GRPGeomRequest > req)
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
static constexpr header::DataDescription getDataDescriptionCCDBCMV()
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVFirstTF()
static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
Return data description for aggregated CMVs for a given lane.
static constexpr header::DataDescription getDataDescriptionCMVOrbitReset()
static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane)
Return data description for orbit/BC info for a given output lane.
GLeglImageOES image
Definition glcorearb.h:4021
GLuint buffer
Definition glcorearb.h:655
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLintptr offset
Definition glcorearb.h:660
GLbitfield flags
Definition glcorearb.h:1570
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
uint8_t itsSharedClusterMap uint8_t
constexpr double LHCOrbitMUS
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::tuple< uint32_t, uint16_t > orbitBC(uint20_t bunchCrossing, uint32_t firstOrbit)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
o2::framework::DataProcessorSpec getTPCAggregateCMVSpec(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const bool sendCCDB, const bool usePreciseTimestamp, const int nTFsBuffer=1)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::unique_ptr< GPUReconstructionTimeframe > tf
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr uint8_t kDelta
Delta coding between consecutive values (dense only)
static constexpr uint8_t kVarint
Varint compression of the value stream.
static constexpr uint8_t kHuffman
Canonical Huffman compression of the value stream.
static constexpr uint8_t kNone
No compression — raw uint16 values stored flat.
static constexpr uint8_t kSparse
Non-zero positions stored sparsely (varint-encoded deltas)
static constexpr uint8_t kZigzag
Zigzag encoding of deltas or signed values.
static void writeToFile(const std::string &filename, const std::unique_ptr< TTree > &tree)
Write the TTree to a ROOT file.
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"