62 const std::vector<uint32_t>& crus,
63 const unsigned int timeframes,
65 const bool usePreciseTimestamp,
67 std::shared_ptr<o2::base::GRPGeomRequest> req)
70 mTimeFrames{timeframes},
72 mUsePreciseTimestamp{usePreciseTimestamp},
73 mNTFsBuffer{nTFsBuffer},
74 mProcessedCRU(timeframes),
75 mProcessedCRUs(timeframes),
77 mOrbitInfo(timeframes),
78 mOrbitStep(timeframes),
79 mOrbitInfoSeen(timeframes, false),
80 mTFCompleted(timeframes, false),
83 std::sort(mCRUs.begin(), mCRUs.end());
84 for (
auto& crusMap : mProcessedCRUs) {
85 crusMap.reserve(mCRUs.size());
86 for (
const auto cruID : mCRUs) {
87 crusMap.emplace(cruID,
false);
96 mOutputDir = ic.options().get<std::string>(
"output-dir");
97 if (mOutputDir !=
"/dev/null") {
100 mMetaFileDir = ic.options().get<std::string>(
"meta-output-dir");
101 if (mMetaFileDir !=
"/dev/null") {
104 mUseCompressionVarint = ic.options().get<
bool>(
"use-compression-varint");
105 mUseSparse = ic.options().get<
bool>(
"use-sparse");
106 mUseCompressionHuffman = ic.options().get<
bool>(
"use-compression-huffman");
107 mRoundIntegersThreshold =
static_cast<uint16_t
>(ic.options().get<
int>(
"cmv-round-integers-threshold"));
108 mZeroThreshold = ic.options().get<
float>(
"cmv-zero-threshold");
109 mDynamicPrecisionMean = ic.options().get<
float>(
"cmv-dynamic-precision-mean");
110 mDynamicPrecisionSigma = ic.options().get<
float>(
"cmv-dynamic-precision-sigma");
111 mThreads = std::max(1, ic.options().get<
int>(
"nthreads-compression"));
112 LOGP(info,
"CMV aggregation settings: output-dir={}, use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}, nthreads-compression={}",
113 mOutputDir, mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma, mThreads);
126 if (pc.inputs().isValid(
"grpecs")) {
130 if (mUsePreciseTimestamp && pc.inputs().isValid(
"orbitreset")) {
134 if (nCCDBInputs > 0 && pc.inputs().countValidInputs() == nCCDBInputs) {
138 if (mSetDataTakingCont) {
140 mSetDataTakingCont =
false;
149 if (mTFFirst == -1) {
151 mTFFirst = pc.inputs().get<
long>(
ref);
152 mIntervalFirstTF = mTFFirst;
153 mHasIntervalFirstTF =
true;
159 if (currTF == std::numeric_limits<uint32_t>::max()) {
160 if (mTimestampStart == 0) {
163 collectEOSInputs(pc);
167 if (mTFFirst == -1) {
169 mIntervalFirstTF = mTFFirst;
170 mHasIntervalFirstTF =
true;
171 LOGP(warning,
"firstTF not found. Setting {} as first TF for aggregate lane {}", mTFFirst, mLaneId);
174 const long relTF = (currTF - mTFFirst) / mNTFsBuffer;
176 LOGP(warning,
"relTF={} < 0 for TF {}, skipping", relTF, currTF);
179 if (relTF >=
static_cast<long>(mTimeFrames)) {
183 LOGP(warning,
"relTF={} out of range [0, {}) for TF {}: force-completing stale interval and resetting", relTF, mTimeFrames, currTF);
184 if (mTimestampStart == 0) {
187 materializeBufferedTFs(
true);
188 sendOutput(pc.outputs());
192 long nextFirst = mIntervalFirstTF +
static_cast<long>(mTimeFrames) * mNTFsBuffer;
193 while (
static_cast<long>(currTF) >= nextFirst +
static_cast<long>(mTimeFrames) * mNTFsBuffer) {
194 nextFirst +=
static_cast<long>(mTimeFrames) * mNTFsBuffer;
197 mTFFirst = nextFirst;
198 mIntervalFirstTF = nextFirst;
199 mHasIntervalFirstTF =
true;
204 if (!mOrbitInfoSeen[relTF]) {
207 mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(
ref);
208 const auto batchFirstOrbit =
static_cast<uint32_t
>(mOrbitInfo[relTF] >> 32);
214 mOrbitStep[relTF] = ((batchFirstOrbit > 0) && (mNTFsBuffer > 1) && (batchLastOrbit > batchFirstOrbit)) ? (batchLastOrbit - batchFirstOrbit) /
static_cast<uint32_t
>(mNTFsBuffer - 1) : defaultOrbitStep;
215 mLastOrbitStep = mOrbitStep[relTF];
216 mOrbitInfoSeen[relTF] =
true;
221 if (mTimestampStart == 0) {
222 setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
226 auto const*
hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
227 const unsigned int cru =
hdr->subSpecification;
228 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
229 LOGP(
debug,
"Received CMV data from CRU {} which is not part of this aggregate lane", cru);
232 if (mProcessedCRUs[relTF][cru]) {
237 mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
238 mProcessedCRUs[relTF][cru] =
true;
239 ++mProcessedCRU[relTF];
242 if (mProcessedCRU[relTF] == mCRUs.size() && !mTFCompleted[relTF]) {
243 mTFCompleted[relTF] =
true;
245 mLastSeenTF = currTF;
248 if (mProcessedTFs == mTimeFrames) {
249 materializeBufferedTFs(
false);
250 sendOutput(pc.outputs());
257 materializeBufferedTFs(
true);
258 materializeEOSBuffer();
259 sendOutput(ec.outputs());
271 const int mLaneId{0};
272 std::vector<uint32_t> mCRUs{};
273 const unsigned int mTimeFrames{};
274 const bool mSendCCDB{
false};
275 const bool mUsePreciseTimestamp{
false};
276 const int mNTFsBuffer{1};
277 std::string mOutputDir{};
278 std::string mMetaFileDir{};
280 bool mSetDataTakingCont{
true};
281 bool mUseCompressionVarint{
false};
282 bool mUseSparse{
false};
283 bool mUseCompressionHuffman{
false};
284 uint16_t mRoundIntegersThreshold{0};
285 float mZeroThreshold{0.f};
286 float mDynamicPrecisionMean{1.f};
287 float mDynamicPrecisionSigma{0.f};
290 long mTimestampStart{0};
291 long mIntervalFirstTF{0};
292 bool mHasIntervalFirstTF{
false};
293 unsigned int mProcessedTFs{0};
294 std::vector<unsigned int> mProcessedCRU{};
295 std::vector<std::unordered_map<unsigned int, bool>> mProcessedCRUs{};
296 std::vector<std::unordered_map<uint32_t, std::vector<uint16_t>>> mRawCMVs{};
297 std::vector<uint64_t> mOrbitInfo{};
298 std::vector<uint32_t> mOrbitStep{};
299 std::vector<bool> mOrbitInfoSeen{};
300 std::vector<bool> mTFCompleted{};
301 std::unordered_map<uint32_t, std::vector<uint16_t>> mEOSRawCMVs{};
302 uint32_t mEOSFirstOrbit{0};
303 uint16_t mEOSFirstBC{0};
304 uint32_t mLastOrbitStep{0};
305 uint32_t mLastSeenTF{0};
306 unsigned int mIntervalTFCount{0};
308 uint32_t mIntervalFirstOrbit{0};
309 uint32_t mIntervalLastOrbit{0};
310 uint32_t mFirstOrbitDPL{0};
311 bool mIntervalOrbitSet{
false};
313 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
314 std::unique_ptr<TTree> mIntervalTree{};
317 const std::vector<InputSpec> mFilter{
320 Lifetime::Sporadic}};
321 const std::vector<InputSpec> mOrbitFilter{
324 Lifetime::Sporadic}};
325 const std::vector<InputSpec> mFirstTFFilter{
328 Lifetime::Sporadic}};
330 uint8_t buildCompressionFlags()
const
336 if (mUseCompressionHuffman) {
338 }
else if (mUseCompressionVarint) {
346 void initIntervalTree()
348 mIntervalTree = std::make_unique<TTree>(
"ccdb_object",
"ccdb_object");
349 mIntervalTree->SetAutoSave(0);
350 mIntervalTree->SetDirectory(
nullptr);
352 mIntervalTree->Branch(
"CMVPerTFCompressed", &mCurrentCompressedTF);
354 mIntervalTree->Branch(
"CMVPerTF", &mCurrentTF);
362 if (mEOSFirstOrbit == 0) {
365 mEOSFirstOrbit =
static_cast<uint32_t
>(
orbitBC >> 32);
366 mEOSFirstBC =
static_cast<uint16_t
>(
orbitBC & 0xFFFFu);
372 auto const*
hdr = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
373 const unsigned int cru =
hdr->subSpecification;
374 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
378 auto&
buffer = mEOSRawCMVs[cru];
379 buffer.insert(
buffer.end(), cmvVec.begin(), cmvVec.end());
389 if (mUsePreciseTimestamp && !mTFInfo.
second) {
393 mTimestampStart = tinfo.creation;
394 LOGP(warning,
"Orbit reset info not yet received; using DPL creation time {} ms as fallback timestamp for interval starting at TF {}", mTimestampStart, mTFFirst);
399 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * nHBFPerTF;
400 mFirstOrbitDPL = tinfo.firstTForbit - nOrbitsOffset;
402 LOGP(info,
"Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
403 mTFInfo.
first, tinfo.tfCounter, tinfo.firstTForbit, nHBFPerTF, relTF, nOrbitsOffset);
409 void materializeBufferedTFs(
const bool includeIncomplete)
411 for (
unsigned int relTF = 0; relTF < mTimeFrames; ++relTF) {
412 if (mProcessedCRU[relTF] == 0) {
416 if ((mProcessedCRU[relTF] != mCRUs.size()) && !includeIncomplete) {
420 if ((mProcessedCRU[relTF] != mCRUs.size()) && includeIncomplete) {
421 LOGP(warning,
"Aggregate lane {} flushing incomplete CMV batch relTF {} at EOS: received {} CRUs out of {}", mLaneId, relTF, mProcessedCRU[relTF], mCRUs.size());
424 if (!mHasIntervalFirstTF) {
425 mIntervalFirstTF = mTFFirst == -1 ? 0 : mTFFirst;
426 mHasIntervalFirstTF =
true;
430 const auto maxBufferSize = getMaxBufferSize(mRawCMVs[relTF]);
431 const int nTFsInBatch = maxBufferSize ? std::max(1,
static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF)) : mNTFsBuffer;
434 appendBatchToTree(mRawCMVs[relTF], mOrbitInfo[relTF], orbitStep, nTFsInBatch);
441 void materializeEOSBuffer()
443 if (mEOSRawCMVs.empty()) {
447 const auto maxBufferSize = getMaxBufferSize(mEOSRawCMVs);
448 const int nTFsInBatch =
static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF);
449 if (nTFsInBatch <= 0) {
453 if (!mHasIntervalFirstTF) {
454 mIntervalFirstTF = mLastSeenTF + 1;
455 mHasIntervalFirstTF =
true;
458 const uint64_t orbitInfo = (
static_cast<uint64_t
>(mEOSFirstOrbit) << 32) |
static_cast<uint64_t
>(mEOSFirstBC);
461 appendBatchToTree(mEOSRawCMVs, orbitInfo, orbitStep, nTFsInBatch);
462 mLastSeenTF +=
static_cast<uint32_t
>(nTFsInBatch);
465 static size_t getMaxBufferSize(
const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs)
467 size_t maxBufferSize = 0;
468 for (
const auto& [cru,
values] : rawCMVs) {
469 maxBufferSize = std::max(maxBufferSize,
values.size());
471 return maxBufferSize;
477 void appendBatchToTree(
const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs,
const uint64_t orbitInfo,
const uint32_t orbitStep,
const int nTFsInBatch)
479 if (nTFsInBatch <= 0) {
483 const auto firstOrbit =
static_cast<uint32_t
>(orbitInfo >> 32);
484 const auto firstBC =
static_cast<uint16_t
>(orbitInfo & 0xFFFFu);
486 const auto batchFirstOrbitDPL = (firstOrbit > 0) ? firstOrbit : mFirstOrbitDPL;
487 if (!mIntervalOrbitSet) {
488 mIntervalFirstOrbit = batchFirstOrbitDPL;
489 mIntervalOrbitSet =
true;
491 mIntervalLastOrbit = batchFirstOrbitDPL +
static_cast<uint32_t
>(nTFsInBatch - 1) * orbitStep;
493 std::vector<PreparedTF> prepared(nTFsInBatch);
494 const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch));
495 const int chunkSize = (nTFsInBatch + nThreads - 1) / nThreads;
497 auto worker = [&](
const int iThread) {
498 const int beginTF = iThread * chunkSize;
499 const int endTF = std::min(nTFsInBatch, beginTF + chunkSize);
500 for (
int tfIndex = beginTF; tfIndex < endTF; ++tfIndex) {
502 auto& preparedTF = prepared[tfIndex];
503 preparedTF.tf.firstOrbit = firstOrbit +
static_cast<uint32_t
>(tfIndex) * orbitStep;
504 preparedTF.tf.firstOrbitDPL = batchFirstOrbitDPL +
static_cast<uint32_t
>(tfIndex) * orbitStep;
506 for (
const auto& [cru,
values] : rawCMVs) {
507 const uint32_t
offset =
static_cast<uint32_t
>(tfIndex) * cmv::NTimeBinsPerTF;
511 const uint32_t nBins = std::min(
static_cast<uint32_t
>(
values.size()) -
offset, cmv::NTimeBinsPerTF);
512 for (uint32_t tb = 0; tb < nBins; ++tb) {
513 preparedTF.tf.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] =
values[
offset + tb];
517 preparedTF.tf.roundToIntegers(mRoundIntegersThreshold);
518 if (mZeroThreshold > 0.f) {
519 preparedTF.tf.zeroSmallValues(mZeroThreshold);
521 if (mDynamicPrecisionSigma > 0.f) {
522 preparedTF.tf.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
525 preparedTF.compressed = preparedTF.tf.compress(
flags);
530 std::vector<std::thread> workers;
531 workers.reserve(nThreads - 1);
532 for (
int iThread = 1; iThread < nThreads; ++iThread) {
533 workers.emplace_back(worker, iThread);
536 for (
auto& thread : workers) {
540 for (
int tfIndex = 0; tfIndex < nTFsInBatch; ++tfIndex) {
542 mCurrentCompressedTF = std::move(prepared[tfIndex].compressed);
544 mCurrentTF = std::move(prepared[tfIndex].
tf);
546 mIntervalTree->Fill();
553 using timer = std::chrono::high_resolution_clock;
555 if (mIntervalTFCount == 0) {
556 LOGP(warning,
"CMV interval is empty at sendOutput for lane {}, skipping", mLaneId);
560 const auto lastTF = mIntervalFirstTF +
static_cast<long>(mIntervalTFCount) - 1;
561 mIntervalTree->GetUserInfo()->Clear();
562 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"firstTF", mIntervalFirstTF));
563 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"lastTF", lastTF));
565 LOGP(info,
"CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF);
566 auto start = timer::now();
571 if (mOutputDir !=
"/dev/null") {
572 const std::string calibFName = fmt::format(
"CMV_run_{}_orbit_{}_{}_timestamp_{}_{}.root",
573 mRun, mIntervalFirstOrbit, mIntervalLastOrbit, mTimestampStart, timeStampEnd);
576 LOGP(info,
"CMV file written to {}", mOutputDir + calibFName);
577 }
catch (
const std::exception& e) {
578 LOGP(error,
"Failed to write CMV file {}: {}", mOutputDir + calibFName, e.what());
581 if (mMetaFileDir !=
"/dev/null") {
585 calMetaData.
type =
"calib";
587 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mMetaFileDir, calibFName);
588 auto metaFileName = fmt::format(
"{}{}.done", mMetaFileDir, calibFName);
590 std::ofstream metaFileOut(metaFileNameTmp);
591 metaFileOut << calMetaData;
593 std::filesystem::rename(metaFileNameTmp, metaFileName);
594 }
catch (std::exception
const& e) {
595 LOG(error) <<
"Failed to store CMV meta data file " << metaFileName <<
", reason: " << e.what();
600 if ((!mSendCCDB) && (mOutputDir ==
"/dev/null")) {
601 LOGP(warning,
"Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
607 if (timeStampEnd <= mTimestampStart) {
608 LOGP(warning,
"Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd);
616 TMemFile mf(
"trim",
image->data(),
static_cast<Long64_t
>(
image->size()),
"READ");
617 image->resize(
static_cast<size_t>(mf.GetEND()));
621 LOGP(info,
"Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(),
image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
625 auto stop = timer::now();
626 std::chrono::duration<float> elapsed =
stop -
start;
627 LOGP(info,
"CMV CCDB serialisation time: {:.3f} s", elapsed.count());
635 mIntervalFirstTF = 0;
636 mHasIntervalFirstTF =
false;
638 std::fill(mProcessedCRU.begin(), mProcessedCRU.end(), 0);
639 std::fill(mOrbitInfo.begin(), mOrbitInfo.end(), 0);
640 std::fill(mOrbitStep.begin(), mOrbitStep.end(), 0);
641 std::fill(mOrbitInfoSeen.begin(), mOrbitInfoSeen.end(),
false);
642 std::fill(mTFCompleted.begin(), mTFCompleted.end(),
false);
643 for (
auto& processedMap : mProcessedCRUs) {
644 for (
auto& [cru, seen] : processedMap) {
648 for (
auto& rawPerTF : mRawCMVs) {
656 mIntervalTFCount = 0;
657 mIntervalFirstOrbit = 0;
658 mIntervalLastOrbit = 0;
660 mIntervalOrbitSet =
false;