59 const std::vector<uint32_t>& crus,
60 const unsigned int timeframes,
62 const bool usePreciseTimestamp,
64 std::shared_ptr<o2::base::GRPGeomRequest> req)
67 mTimeFrames{timeframes},
69 mUsePreciseTimestamp{usePreciseTimestamp},
70 mNTFsBuffer{nTFsBuffer},
71 mProcessedCRU(timeframes),
72 mProcessedCRUs(timeframes),
74 mOrbitInfo(timeframes),
75 mOrbitStep(timeframes),
76 mOrbitInfoSeen(timeframes, false),
77 mTFCompleted(timeframes, false),
80 std::sort(mCRUs.begin(), mCRUs.end());
81 for (
auto& crusMap : mProcessedCRUs) {
82 crusMap.reserve(mCRUs.size());
83 for (
const auto cruID : mCRUs) {
84 crusMap.emplace(cruID,
false);
93 mOutputDir = ic.options().get<std::string>(
"output-dir");
94 if (mOutputDir !=
"/dev/null") {
97 mMetaFileDir = ic.options().get<std::string>(
"meta-output-dir");
98 if (mMetaFileDir !=
"/dev/null") {
101 mUseCompressionVarint = ic.options().get<
bool>(
"use-compression-varint");
102 mUseSparse = ic.options().get<
bool>(
"use-sparse");
103 mUseCompressionHuffman = ic.options().get<
bool>(
"use-compression-huffman");
104 mRoundIntegersThreshold =
static_cast<uint16_t
>(ic.options().get<
int>(
"cmv-round-integers-threshold"));
105 mZeroThreshold = ic.options().get<
float>(
"cmv-zero-threshold");
106 mDynamicPrecisionMean = ic.options().get<
float>(
"cmv-dynamic-precision-mean");
107 mDynamicPrecisionSigma = ic.options().get<
float>(
"cmv-dynamic-precision-sigma");
108 mThreads = std::max(1, ic.options().get<
int>(
"nthreads-compression"));
109 LOGP(info,
"CMV aggregation settings: output-dir={}, use-compression-varint={}, use-sparse={}, use-compression-huffman={}, cmv-round-integers-threshold={}, cmv-zero-threshold={}, cmv-dynamic-precision-mean={}, cmv-dynamic-precision-sigma={}, nthreads-compression={}",
110 mOutputDir, mUseCompressionVarint, mUseSparse, mUseCompressionHuffman, mRoundIntegersThreshold, mZeroThreshold, mDynamicPrecisionMean, mDynamicPrecisionSigma, mThreads);
123 if (pc.inputs().isValid(
"grpecs")) {
127 if (mUsePreciseTimestamp && pc.inputs().isValid(
"orbitreset")) {
131 if (nCCDBInputs > 0 && pc.inputs().countValidInputs() == nCCDBInputs) {
135 if (mSetDataTakingCont) {
137 mSetDataTakingCont =
false;
146 if (mTFFirst == -1) {
148 mTFFirst = pc.inputs().get<
long>(
ref);
149 mIntervalFirstTF = mTFFirst;
150 mHasIntervalFirstTF =
true;
156 if (currTF == std::numeric_limits<uint32_t>::max()) {
157 if (mTimestampStart == 0) {
160 collectEOSInputs(pc);
164 if (mTFFirst == -1) {
166 mIntervalFirstTF = mTFFirst;
167 mHasIntervalFirstTF =
true;
168 LOGP(warning,
"firstTF not found. Setting {} as first TF for aggregate lane {}", mTFFirst, mLaneId);
171 const long relTF = (currTF - mTFFirst) / mNTFsBuffer;
173 LOGP(warning,
"relTF={} < 0 for TF {}, skipping", relTF, currTF);
176 if (relTF >=
static_cast<long>(mTimeFrames)) {
180 LOGP(warning,
"relTF={} out of range [0, {}) for TF {}: force-completing stale interval and resetting", relTF, mTimeFrames, currTF);
181 if (mTimestampStart == 0) {
184 materializeBufferedTFs(
true);
185 sendOutput(pc.outputs());
189 long nextFirst = mIntervalFirstTF +
static_cast<long>(mTimeFrames) * mNTFsBuffer;
190 while (
static_cast<long>(currTF) >= nextFirst +
static_cast<long>(mTimeFrames) * mNTFsBuffer) {
191 nextFirst +=
static_cast<long>(mTimeFrames) * mNTFsBuffer;
194 mTFFirst = nextFirst;
195 mIntervalFirstTF = nextFirst;
196 mHasIntervalFirstTF =
true;
201 if (!mOrbitInfoSeen[relTF]) {
204 mOrbitInfo[relTF] = pc.inputs().get<uint64_t>(
ref);
205 const auto batchFirstOrbit =
static_cast<uint32_t
>(mOrbitInfo[relTF] >> 32);
211 mOrbitStep[relTF] = ((batchFirstOrbit > 0) && (mNTFsBuffer > 1) && (batchLastOrbit > batchFirstOrbit)) ? (batchLastOrbit - batchFirstOrbit) /
static_cast<uint32_t
>(mNTFsBuffer - 1) : defaultOrbitStep;
212 mLastOrbitStep = mOrbitStep[relTF];
213 mOrbitInfoSeen[relTF] =
true;
218 if (mTimestampStart == 0) {
219 setTimestampCCDB(relTF, mOrbitStep[relTF], pc);
223 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
224 const unsigned int cru =
hdr->subSpecification;
225 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
226 LOGP(
debug,
"Received CMV data from CRU {} which is not part of this aggregate lane", cru);
229 if (mProcessedCRUs[relTF][cru]) {
234 mRawCMVs[relTF][cru] = std::vector<uint16_t>(cmvVec.begin(), cmvVec.end());
235 mProcessedCRUs[relTF][cru] =
true;
236 ++mProcessedCRU[relTF];
239 if (mProcessedCRU[relTF] == mCRUs.size() && !mTFCompleted[relTF]) {
240 mTFCompleted[relTF] =
true;
242 mLastSeenTF = currTF;
245 if (mProcessedTFs == mTimeFrames) {
246 materializeBufferedTFs(
false);
247 sendOutput(pc.outputs());
254 materializeBufferedTFs(
true);
255 materializeEOSBuffer();
256 sendOutput(ec.outputs());
265 CMVPerTFCompressed compressed{};
268 const int mLaneId{0};
269 std::vector<uint32_t> mCRUs{};
270 const unsigned int mTimeFrames{};
271 const bool mSendCCDB{
false};
272 const bool mUsePreciseTimestamp{
false};
273 const int mNTFsBuffer{1};
274 std::string mOutputDir{};
275 std::string mMetaFileDir{};
277 bool mSetDataTakingCont{
true};
278 bool mUseCompressionVarint{
false};
279 bool mUseSparse{
false};
280 bool mUseCompressionHuffman{
false};
281 uint16_t mRoundIntegersThreshold{0};
282 float mZeroThreshold{0.f};
283 float mDynamicPrecisionMean{1.f};
284 float mDynamicPrecisionSigma{0.f};
287 long mTimestampStart{0};
288 long mIntervalFirstTF{0};
289 bool mHasIntervalFirstTF{
false};
290 unsigned int mProcessedTFs{0};
291 std::vector<unsigned int> mProcessedCRU{};
292 std::vector<std::unordered_map<unsigned int, bool>> mProcessedCRUs{};
293 std::vector<std::unordered_map<uint32_t, std::vector<uint16_t>>> mRawCMVs{};
294 std::vector<uint64_t> mOrbitInfo{};
295 std::vector<uint32_t> mOrbitStep{};
296 std::vector<bool> mOrbitInfoSeen{};
297 std::vector<bool> mTFCompleted{};
298 std::unordered_map<uint32_t, std::vector<uint16_t>> mEOSRawCMVs{};
299 uint32_t mEOSFirstOrbit{0};
300 uint16_t mEOSFirstBC{0};
301 uint32_t mLastOrbitStep{0};
302 uint32_t mLastSeenTF{0};
303 unsigned int mIntervalTFCount{0};
305 uint32_t mIntervalFirstOrbit{0};
306 uint32_t mIntervalLastOrbit{0};
307 uint32_t mFirstOrbitDPL{0};
308 bool mIntervalOrbitSet{
false};
309 dataformats::Pair<long, int> mTFInfo{};
310 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
311 std::unique_ptr<TTree> mIntervalTree{};
312 CMVPerTF mCurrentTF{};
313 CMVPerTFCompressed mCurrentCompressedTF{};
314 const std::vector<o2::framework::InputSpec> mFilter{
317 o2::framework::Lifetime::Sporadic}};
318 const std::vector<o2::framework::InputSpec> mOrbitFilter{
321 o2::framework::Lifetime::Sporadic}};
322 const std::vector<o2::framework::InputSpec> mFirstTFFilter{
325 o2::framework::Lifetime::Sporadic}};
327 uint8_t buildCompressionFlags()
const
333 if (mUseCompressionHuffman) {
335 }
else if (mUseCompressionVarint) {
343 void initIntervalTree()
345 mIntervalTree = std::make_unique<TTree>(
"ccdb_object",
"ccdb_object");
346 mIntervalTree->SetAutoSave(0);
347 mIntervalTree->SetDirectory(
nullptr);
349 mIntervalTree->Branch(
"CMVPerTFCompressed", &mCurrentCompressedTF);
351 mIntervalTree->Branch(
"CMVPerTF", &mCurrentTF);
359 if (mEOSFirstOrbit == 0) {
360 for (
auto&
ref :
o2::
framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
362 mEOSFirstOrbit =
static_cast<uint32_t
>(
orbitBC >> 32);
363 mEOSFirstBC =
static_cast<uint16_t
>(
orbitBC & 0xFFFFu);
368 for (
auto&
ref :
o2::
framework::InputRecordWalker(pc.inputs(), mFilter)) {
369 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
370 const unsigned int cru =
hdr->subSpecification;
371 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
375 auto&
buffer = mEOSRawCMVs[cru];
376 buffer.insert(
buffer.end(), cmvVec.begin(), cmvVec.end());
386 if (mUsePreciseTimestamp && !mTFInfo.
second) {
390 mTimestampStart = tinfo.creation;
391 LOGP(warning,
"Orbit reset info not yet received; using DPL creation time {} ms as fallback timestamp for interval starting at TF {}", mTimestampStart, mTFFirst);
396 const auto nOrbitsOffset = (relTF * mNTFsBuffer + (mNTFsBuffer - 1)) * nHBFPerTF;
397 mFirstOrbitDPL = tinfo.firstTForbit - nOrbitsOffset;
399 LOGP(info,
"Setting timestamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}, NHBFPerTF: {}, relTF: {}, nOrbitsOffset: {}",
400 mTFInfo.
first, tinfo.tfCounter, tinfo.firstTForbit, nHBFPerTF, relTF, nOrbitsOffset);
406 void materializeBufferedTFs(
const bool includeIncomplete)
408 for (
unsigned int relTF = 0; relTF < mTimeFrames; ++relTF) {
409 if (mProcessedCRU[relTF] == 0) {
413 if ((mProcessedCRU[relTF] != mCRUs.size()) && !includeIncomplete) {
417 if ((mProcessedCRU[relTF] != mCRUs.size()) && includeIncomplete) {
418 LOGP(warning,
"Aggregate lane {} flushing incomplete CMV batch relTF {} at EOS: received {} CRUs out of {}", mLaneId, relTF, mProcessedCRU[relTF], mCRUs.size());
421 if (!mHasIntervalFirstTF) {
422 mIntervalFirstTF = mTFFirst == -1 ? 0 : mTFFirst;
423 mHasIntervalFirstTF =
true;
427 const auto maxBufferSize = getMaxBufferSize(mRawCMVs[relTF]);
428 const int nTFsInBatch = maxBufferSize ? std::max(1,
static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF)) : mNTFsBuffer;
431 appendBatchToTree(mRawCMVs[relTF], mOrbitInfo[relTF], orbitStep, nTFsInBatch);
438 void materializeEOSBuffer()
440 if (mEOSRawCMVs.empty()) {
444 const auto maxBufferSize = getMaxBufferSize(mEOSRawCMVs);
445 const int nTFsInBatch =
static_cast<int>(maxBufferSize / cmv::NTimeBinsPerTF);
446 if (nTFsInBatch <= 0) {
450 if (!mHasIntervalFirstTF) {
451 mIntervalFirstTF = mLastSeenTF + 1;
452 mHasIntervalFirstTF =
true;
455 const uint64_t orbitInfo = (
static_cast<uint64_t
>(mEOSFirstOrbit) << 32) |
static_cast<uint64_t
>(mEOSFirstBC);
458 appendBatchToTree(mEOSRawCMVs, orbitInfo, orbitStep, nTFsInBatch);
459 mLastSeenTF +=
static_cast<uint32_t
>(nTFsInBatch);
462 static size_t getMaxBufferSize(
const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs)
464 size_t maxBufferSize = 0;
465 for (
const auto& [cru,
values] : rawCMVs) {
466 maxBufferSize = std::max(maxBufferSize,
values.size());
468 return maxBufferSize;
474 void appendBatchToTree(
const std::unordered_map<uint32_t, std::vector<uint16_t>>& rawCMVs,
const uint64_t orbitInfo,
const uint32_t orbitStep,
const int nTFsInBatch)
476 if (nTFsInBatch <= 0) {
480 const auto firstOrbit =
static_cast<uint32_t
>(orbitInfo >> 32);
481 const auto firstBC =
static_cast<uint16_t
>(orbitInfo & 0xFFFFu);
483 const auto batchFirstOrbitDPL = (firstOrbit > 0) ? firstOrbit : mFirstOrbitDPL;
484 if (!mIntervalOrbitSet) {
485 mIntervalFirstOrbit = batchFirstOrbitDPL;
486 mIntervalOrbitSet =
true;
488 mIntervalLastOrbit = batchFirstOrbitDPL +
static_cast<uint32_t
>(nTFsInBatch - 1) * orbitStep;
490 std::vector<PreparedTF> prepared(nTFsInBatch);
491 const int nThreads = std::max(1, std::min(mThreads, nTFsInBatch));
492 const int chunkSize = (nTFsInBatch + nThreads - 1) / nThreads;
494 auto worker = [&](
const int iThread) {
495 const int beginTF = iThread * chunkSize;
496 const int endTF = std::min(nTFsInBatch, beginTF + chunkSize);
497 for (
int tfIndex = beginTF; tfIndex < endTF; ++tfIndex) {
499 auto& preparedTF = prepared[tfIndex];
500 preparedTF.tf.firstOrbit = firstOrbit +
static_cast<uint32_t
>(tfIndex) * orbitStep;
501 preparedTF.tf.firstOrbitDPL = batchFirstOrbitDPL +
static_cast<uint32_t
>(tfIndex) * orbitStep;
503 for (
const auto& [cru,
values] : rawCMVs) {
504 const uint32_t
offset =
static_cast<uint32_t
>(tfIndex) * cmv::NTimeBinsPerTF;
508 const uint32_t nBins = std::min(
static_cast<uint32_t
>(
values.size()) -
offset, cmv::NTimeBinsPerTF);
509 for (uint32_t tb = 0; tb < nBins; ++tb) {
510 preparedTF.tf.mDataPerTF[cru * cmv::NTimeBinsPerTF + tb] =
values[
offset + tb];
514 preparedTF.tf.roundToIntegers(mRoundIntegersThreshold);
515 if (mZeroThreshold > 0.f) {
516 preparedTF.tf.zeroSmallValues(mZeroThreshold);
518 if (mDynamicPrecisionSigma > 0.f) {
519 preparedTF.tf.trimGaussianPrecision(mDynamicPrecisionMean, mDynamicPrecisionSigma);
522 preparedTF.compressed = preparedTF.tf.compress(
flags);
527 std::vector<std::thread> workers;
528 workers.reserve(nThreads - 1);
529 for (
int iThread = 1; iThread < nThreads; ++iThread) {
530 workers.emplace_back(worker, iThread);
533 for (
auto& thread : workers) {
537 for (
int tfIndex = 0; tfIndex < nTFsInBatch; ++tfIndex) {
539 mCurrentCompressedTF = std::move(prepared[tfIndex].compressed);
541 mCurrentTF = std::move(prepared[tfIndex].
tf);
543 mIntervalTree->Fill();
550 using timer = std::chrono::high_resolution_clock;
552 if (mIntervalTFCount == 0) {
553 LOGP(warning,
"CMV interval is empty at sendOutput for lane {}, skipping", mLaneId);
557 const auto lastTF = mIntervalFirstTF +
static_cast<long>(mIntervalTFCount) - 1;
558 mIntervalTree->GetUserInfo()->Clear();
559 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"firstTF", mIntervalFirstTF));
560 mIntervalTree->GetUserInfo()->Add(
new TParameter<long>(
"lastTF", lastTF));
562 LOGP(info,
"CMVPerTF TTree lane {}: {} entries, firstTF={}, lastTF={}", mLaneId, mIntervalTFCount, mIntervalFirstTF, lastTF);
563 auto start = timer::now();
568 if (mOutputDir !=
"/dev/null") {
569 const std::string calibFName = fmt::format(
"CMV_run_{}_orbit_{}_{}_timestamp_{}_{}.root",
570 mRun, mIntervalFirstOrbit, mIntervalLastOrbit, mTimestampStart, timeStampEnd);
573 LOGP(info,
"CMV file written to {}", mOutputDir + calibFName);
574 }
catch (
const std::exception& e) {
575 LOGP(error,
"Failed to write CMV file {}: {}", mOutputDir + calibFName, e.what());
578 if (mMetaFileDir !=
"/dev/null") {
582 calMetaData.
type =
"calib";
584 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mMetaFileDir, calibFName);
585 auto metaFileName = fmt::format(
"{}{}.done", mMetaFileDir, calibFName);
587 std::ofstream metaFileOut(metaFileNameTmp);
588 metaFileOut << calMetaData;
590 std::filesystem::rename(metaFileNameTmp, metaFileName);
591 }
catch (std::exception
const& e) {
592 LOG(error) <<
"Failed to store CMV meta data file " << metaFileName <<
", reason: " << e.what();
597 if ((!mSendCCDB) && (mOutputDir ==
"/dev/null")) {
598 LOGP(warning,
"Neither CCDB output nor output-dir is enabled for aggregate lane {}, skipping CMV export", mLaneId);
604 if (timeStampEnd <= mTimestampStart) {
605 LOGP(warning,
"Invalid CCDB timestamp range start:{} end:{}, skipping upload", mTimestampStart, timeStampEnd);
613 TMemFile mf(
"trim",
image->data(),
static_cast<Long64_t
>(
image->size()),
"READ");
614 image->resize(
static_cast<size_t>(mf.GetEND()));
618 LOGP(info,
"Sending object {} / {} of size {} bytes, valid for {} : {}", ccdbInfoCMV.getPath(), ccdbInfoCMV.getFileName(),
image->size(), ccdbInfoCMV.getStartValidityTimestamp(), ccdbInfoCMV.getEndValidityTimestamp());
622 auto stop = timer::now();
623 std::chrono::duration<float> elapsed =
stop -
start;
624 LOGP(info,
"CMV CCDB serialisation time: {:.3f} s", elapsed.count());
632 mIntervalFirstTF = 0;
633 mHasIntervalFirstTF =
false;
635 std::fill(mProcessedCRU.begin(), mProcessedCRU.end(), 0);
636 std::fill(mOrbitInfo.begin(), mOrbitInfo.end(), 0);
637 std::fill(mOrbitStep.begin(), mOrbitStep.end(), 0);
638 std::fill(mOrbitInfoSeen.begin(), mOrbitInfoSeen.end(),
false);
639 std::fill(mTFCompleted.begin(), mTFCompleted.end(),
false);
640 for (
auto& processedMap : mProcessedCRUs) {
641 for (
auto& [cru, seen] : processedMap) {
645 for (
auto& rawPerTF : mRawCMVs) {
653 mIntervalTFCount = 0;
654 mIntervalFirstOrbit = 0;
655 mIntervalLastOrbit = 0;
657 mIntervalOrbitSet =
false;
658 mCurrentTF = CMVPerTF{};
659 mCurrentCompressedTF = CMVPerTFCompressed{};