53 TPCDistributeCMVSpec(
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const int nTFsBuffer,
const int firstTF,
const bool sendCCDB,
const bool usePreciseTimestamp, std::shared_ptr<o2::base::GRPGeomRequest> req)
55 mTimeFrames{timeframes},
56 mNTFsBuffer{nTFsBuffer},
57 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
58 mTFStart{{firstTF, firstTF + timeframes}},
59 mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}},
62 mUsePreciseTimestamp{usePreciseTimestamp},
63 mSendCCDBOutputOrbitReset(1),
64 mSendCCDBOutputGRPECS(1),
65 mOrbitInfoForwarded{{std::vector<bool>(timeframes,
false), std::vector<bool>(timeframes,
false)}}
68 std::sort(mCRUs.begin(), mCRUs.end());
70 for (
auto& processedCRUbuffer : mProcessedCRUs) {
71 processedCRUbuffer.resize(mTimeFrames);
72 for (
auto& crusMap : processedCRUbuffer) {
73 crusMap.reserve(mCRUs.size());
74 for (
const auto cruID : mCRUs) {
75 crusMap.emplace(cruID,
false);
90 mNFactorTFs = ic.options().get<
int>(
"nFactorTFs");
91 mNTFsDataDrop = ic.options().get<
int>(
"drop-data-after-nTFs");
92 mCheckEveryNData = ic.options().get<
int>(
"check-data-every-n");
93 if (mCheckEveryNData == 0) {
94 mCheckEveryNData = mTimeFrames / 2;
95 if (mCheckEveryNData == 0) {
98 mNTFsDataDrop = mCheckEveryNData;
100 mDumpCMVs = ic.options().get<
bool>(
"dump-cmvs");
101 mUseCompressionVarint = ic.options().get<
bool>(
"use-compression-varint");
102 mUseSparse = ic.options().get<
bool>(
"use-sparse");
103 mUseCompressionHuffman = ic.options().get<
bool>(
"use-compression-huffman");
104 mRoundIntegersThreshold =
static_cast<uint16_t
>(ic.options().get<
int>(
"cmv-round-integers-threshold"));
105 mZeroThreshold = ic.options().get<
float>(
"cmv-zero-threshold");
106 mDynamicPrecisionMean = ic.options().get<
float>(
"cmv-dynamic-precision-mean");
107 mDynamicPrecisionSigma = ic.options().get<
float>(
"cmv-dynamic-precision-sigma");
108 LOGP(info,
"CMV compression settings: use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}",
109 mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma);
118 LOGP(info,
"Updating ORBITRESET");
119 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(),
true);
123 LOGP(info,
"Updating GRPECS");
124 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(),
true);
126 LOGP(info,
"Detected default GRPECS object");
134 if (mCCDBRequest->askTime) {
135 const bool grpecsValid = pc.inputs().isValid(
"grpecs");
136 const bool orbitResetValid = pc.inputs().isValid(
"orbitReset");
140 if (orbitResetValid) {
141 pc.inputs().get<std::vector<Long64_t>*>(
"orbitReset");
143 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
147 if (mSendCCDBOutputOrbitReset[0] && mSendCCDBOutputGRPECS[0]) {
148 mSendCCDBOutputOrbitReset[0] =
false;
149 mSendCCDBOutputGRPECS[0] =
false;
158 if (mTFStart.front() <= -1) {
159 const auto firstTF =
tf;
160 const long offsetTF = std::abs(mTFStart.front() + 1);
161 const auto nTotTFs = getNRealTFs();
162 mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs};
163 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
164 LOGP(info,
"Setting {} as first TF", mTFStart[0]);
165 LOGP(info,
"Using offset of {} TFs for setting the first TF", offsetTF);
169 const bool currentBuffer = (
tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
170 if (mTFStart[currentBuffer] >
tf) {
171 LOGP(info,
"All CRUs for current TF {} already received. Skipping this TF",
tf);
175 const unsigned int relTF = (
tf - mTFStart[currentBuffer]) / mNTFsBuffer;
176 LOGP(info,
"Current TF: {}, relative TF: {}, current buffer: {}, mTFStart: {}",
tf, relTF, currentBuffer, mTFStart[currentBuffer]);
178 if (relTF >= mProcessedCRU[currentBuffer].
size()) {
179 LOGP(warning,
"Skipping tf {}: relative tf {} is larger than size of buffer: {}",
tf, relTF, mProcessedCRU[currentBuffer].
size());
182 mProcessedTotalData = mCheckEveryNData;
183 checkIntervalsForMissingData(pc, currentBuffer, relTF,
tf);
187 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
192 if (mIntervalTFCount == 0) {
193 mIntervalFirstTF =
tf;
197 if (mTimestampStart == 0) {
198 setTimestampCCDB(relTF, pc);
203 if (!mOrbitInfoForwarded[currentBuffer][relTF]) {
205 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
206 const unsigned int cru = hdr->subSpecification >> 7;
207 if (std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
208 const auto orbitBC = pc.inputs().get<uint64_t>(
ref);
209 if (mCurrentTF.firstOrbit == 0 && mCurrentTF.firstBC == 0) {
210 mCurrentTF.firstOrbit =
static_cast<uint32_t
>(orbitBC >> 32);
211 mCurrentTF.firstBC =
static_cast<uint16_t
>(orbitBC & 0xFFFFu);
213 mOrbitInfoForwarded[currentBuffer][relTF] =
true;
220 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
221 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
224 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
225 LOGP(info,
"Received data from CRU: {} which was not specified as input. Skipping", cru);
229 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
233 ++mProcessedCRU[currentBuffer][relTF];
236 mProcessedCRUs[currentBuffer][relTF][cru] =
true;
241 const uint32_t
nTimeBins = std::min(
static_cast<uint32_t
>(cmvVec.size()), cmv::NTimeBinsPerTF);
242 for (uint32_t tb = 0; tb <
nTimeBins; ++tb) {
243 mCurrentTF.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] = cmvVec[tb];
247 LOGP(info,
"Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(),
tf);
250 if (mNTFsDataDrop > 0) {
251 checkIntervalsForMissingData(pc, currentBuffer, relTF,
tf);
254 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
255 ++mProcessedTFs[currentBuffer];
258 mCurrentTF.roundToIntegers(mRoundIntegersThreshold);
259 if (mZeroThreshold > 0.f) {
260 mCurrentTF.zeroSmallValues(mZeroThreshold);
262 if (mDynamicPrecisionSigma > 0.f) {
263 mCurrentTF.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
267 const uint8_t
flags = buildCompressionFlags();
269 mCurrentCompressedTF = mCurrentTF.compress(
flags);
272 mIntervalTree->Fill();
277 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
278 sendOutput(pc.outputs(),
tf);
279 finishInterval(pc, currentBuffer,
tf);
285 LOGP(info,
"End of stream, flushing CMV interval ({} TFs)", mIntervalTFCount);
287 mTFEnd[mBuffer] = mLastSeenTF;
288 sendOutput(ec.outputs(), mLastSeenTF);
297 const std::string
name = fmt::format(
"CMVAGG{}", lane).data();
306 const std::string
name = fmt::format(
"CMVORB{}", lane);
316 std::vector<uint32_t> mCRUs{};
317 const unsigned int mTimeFrames{};
318 const int mNTFsBuffer{1};
319 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
320 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
321 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
322 std::array<long, 2> mTFStart{};
323 std::array<long, 2> mTFEnd{};
324 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
325 std::vector<bool> mSendCCDBOutputOrbitReset{};
326 std::vector<bool> mSendCCDBOutputGRPECS{};
328 bool mSendCCDB{
false};
329 bool mUsePreciseTimestamp{
false};
330 bool mDumpCMVs{
false};
331 bool mUseCompressionVarint{
false};
332 bool mUseSparse{
false};
333 bool mUseCompressionHuffman{
false};
334 uint16_t mRoundIntegersThreshold{0};
335 float mZeroThreshold{0.f};
336 float mDynamicPrecisionMean{1.f};
337 float mDynamicPrecisionSigma{0.f};
338 long mTimestampStart{0};
340 std::unique_ptr<TTree> mIntervalTree{};
343 long mIntervalFirstTF{0};
344 unsigned int mIntervalTFCount{0};
346 int mNTFsDataDrop{0};
347 std::array<int, 2> mStartNTFsDataDrop{0};
348 long mProcessedTotalData{0};
349 int mCheckEveryNData{1};
350 std::vector<InputSpec> mFilter{};
351 std::vector<InputSpec> mOrbitFilter{};
352 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
353 uint32_t mLastSeenTF{0};
356 unsigned int getNRealTFs()
const {
return mNTFsBuffer * mTimeFrames; }
359 uint8_t buildCompressionFlags()
const
365 if (mUseCompressionHuffman) {
367 }
else if (mUseCompressionVarint) {
380 void initIntervalTree()
382 mIntervalTree = std::make_unique<TTree>(
"ccdb_object",
"ccdb_object");
383 mIntervalTree->SetAutoSave(0);
384 mIntervalTree->SetDirectory(
nullptr);
386 mIntervalTree->Branch(
"CMVPerTFCompressed", &mCurrentCompressedTF);
388 mIntervalTree->Branch(
"CMVPerTF", &mCurrentTF);
392 void clearBuffer(
const bool currentBuffer)
395 for (
auto& crusMap : mProcessedCRUs[currentBuffer]) {
396 for (
auto& it : crusMap) {
401 mProcessedTFs[currentBuffer] = 0;
402 std::fill(mProcessedCRU[currentBuffer].
begin(), mProcessedCRU[currentBuffer].
end(), 0);
403 std::fill(mOrbitInfoForwarded[currentBuffer].
begin(), mOrbitInfoForwarded[currentBuffer].
end(),
false);
406 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
407 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
415 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
416 LOGP(info,
"Checking for dropped packages...");
419 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
420 LOGP(warning,
"Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size());
421 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size());
422 LOGP(info,
"All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer",
tf);
424 finishInterval(pc, !currentBuffer,
tf);
427 const int tfEndCheck = std::clamp(
static_cast<int>(relTF) - mNTFsDataDrop, 0,
static_cast<int>(mProcessedCRU[currentBuffer].
size()));
428 LOGP(info,
"Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
429 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck);
430 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
436 for (
int iTF = startTF; iTF < endTF; ++iTF) {
437 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
438 LOGP(warning,
"CRUs for rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
439 ++mProcessedTFs[currentBuffer];
440 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
443 for (
auto& it : mProcessedCRUs[currentBuffer][iTF]) {
450 mOrbitInfoForwarded[currentBuffer][iTF] =
true;
457 if (mNFactorTFs > 0) {
461 if (deviceProxy.getNumOutputChannels() > 0) {
462 auto&
state = deviceProxy.getOutputChannelState({0});
463 size_t oldest = std::numeric_limits<size_t>::max() - 1;
464 state.oldestForChannel = {oldest};
468 LOGP(info,
"All TFs {} for current buffer received. Clearing buffer",
tf);
470 mStartNTFsDataDrop[
buffer] = 0;
474 mIntervalFirstTF = 0;
475 mIntervalTFCount = 0;
479 LOGP(info,
"Everything cleared. Waiting for new data to arrive.");
484 if (mUsePreciseTimestamp && !mTFInfo.second) {
488 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * mTFInfo.second;
489 mTimestampStart = mUsePreciseTimestamp
492 LOGP(info,
"Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
493 mTFInfo.first, tinfo.tfCounter, tinfo.firstTForbit, mTFInfo.second, relTF, nOrbitsOffset);
498 using timer = std::chrono::high_resolution_clock;
500 if (mIntervalTFCount == 0) {
501 LOGP(warning,
"CMV interval is empty at sendOutput, skipping");
506 mIntervalTree->GetUserInfo()->Clear();
507 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"firstTF", mIntervalFirstTF));
508 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"lastTF", mLastSeenTF));
510 LOGP(info,
"CMVPerTF TTree: {} entries, firstTF={}, lastTF={}", mIntervalTFCount, mIntervalFirstTF, mLastSeenTF);
511 auto start = timer::now();
515 const std::string fname = fmt::format(
"CMV_timestamp{}.root", mTimestampStart);
517 mCurrentTF.writeToFile(fname, mIntervalTree);
518 LOGP(info,
"CMV debug file written to {}", fname);
519 }
catch (
const std::exception& e) {
520 LOGP(error,
"Failed to write CMV debug file: {}", e.what());
525 LOGP(warning,
"CCDB output disabled, skipping upload!");
533 if (timeStampEnd <= mTimestampStart) {
534 LOGP(warning,
"Invalid CCDB timestamp range start:{} end:{}, skipping upload!",
535 mTimestampStart, timeStampEnd);
539 LOGP(info,
"CCDB timestamp range start:{} end:{}", mTimestampStart, timeStampEnd);
552 TMemFile mf(
"trim",
image->data(),
static_cast<Long64_t
>(
image->size()),
"READ");
553 image->resize(
static_cast<size_t>(mf.GetEND()));
556 LOGP(info,
"Sending object {} / {} of size {} bytes, valid for {} : {}",
557 ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(),
image->size(),
558 ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
563 auto stop = timer::now();
564 std::chrono::duration<float> elapsed = stop -
start;
565 LOGP(info,
"CMV CCDB serialisation time: {:.3f} s", elapsed.count());
569DataProcessorSpec getTPCDistributeCMVSpec(
const int ilane,
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const int firstTF,
const bool sendCCDB =
false,
const bool usePreciseTimestamp =
false,
const int nTFsBuffer = 1)
571 std::vector<InputSpec> inputSpecs;
575 std::vector<OutputSpec> outputSpecs;
577 outputSpecs.emplace_back(
581 outputSpecs.emplace_back(
587 const bool fetchCCDB = usePreciseTimestamp;
588 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB,
596 const std::string
type =
"cmv";
597 const auto id = fmt::format(
"tpc-distribute-{}-{:02}",
type, ilane);
602 AlgorithmSpec{adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, firstTF, sendCCDB, usePreciseTimestamp, ccdbRequest)},
603 Options{{
"drop-data-after-nTFs", VariantType::Int, 0, {
"Number of TFs after which to drop the data"}},
604 {
"check-data-every-n", VariantType::Int, 0, {
"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)"}},
605 {
"nFactorTFs", VariantType::Int, 1000, {
"Number of TFs to skip for sending oldest TF"}},
606 {
"dump-cmvs", VariantType::Bool,
false, {
"Dump CMVs to a local ROOT file for debugging"}},
607 {
"use-sparse", VariantType::Bool,
false, {
"Sparse encoding (skip zero time bins). Alone: raw uint16 values. With --use-compression-varint: varint exact values. With --use-compression-huffman: Huffman exact values"}},
608 {
"use-compression-varint", VariantType::Bool,
false, {
"Delta+zigzag+varint compression (all values). Combined with --use-sparse: sparse positions + varint encoded exact CMV values"}},
609 {
"use-compression-huffman", VariantType::Bool,
false, {
"Huffman encoding. Combined with --use-sparse: sparse positions + Huffman-encoded exact CMV values"}},
610 {
"cmv-zero-threshold", VariantType::Float, 0.f, {
"Zero out CMV values whose float magnitude is below this threshold after optional integer rounding and before compression; 0 disables"}},
611 {
"cmv-round-integers-threshold", VariantType::Int, 0, {
"Round values to nearest integer ADC for |v| <= N ADC before compression; 0 disables"}},
612 {
"cmv-dynamic-precision-mean", VariantType::Float, 1.f, {
"Gaussian centre in |CMV| ADC where the strongest fractional bit trimming is applied"}},
613 {
"cmv-dynamic-precision-sigma", VariantType::Float, 0.f, {
"Gaussian width in ADC for smooth CMV fractional bit trimming; 0 disables"}}}};