22#include <fairmq/Device.h>
54#include <TStopwatch.h>
78 auto* br =
tree.GetBranch(brname.c_str());
81 br->SetAddress(&pptr);
83 br =
tree.Branch(brname.c_str(), &pptr);
87 throw std::runtime_error(fmt::format(
"Failed to fill CTF branch {}", brname));
106 void stop() final { finalize(); }
111 template <
typename C>
113 template <
typename C>
115 void storeDictionaries();
116 void closeTFTreeAndFile();
117 void prepareTFTreeAndFile();
119 size_t getAvailableDiskSpace(
const std::string&
path,
int level);
120 void createLockFile(
int level);
121 void removeLockFile();
125 bool mFinalized =
false;
126 bool mWriteCTF =
true;
127 bool mCreateDict =
false;
128 bool mCreateRunEnvDir =
true;
129 bool mStoreMetaFile =
false;
130 bool mRejectCurrentTF =
false;
131 bool mFallBackDirUsed =
false;
132 bool mFallBackDirProvided =
false;
133 int mReportInterval = -1;
135 int mSaveDictAfter = 0;
136 uint32_t mPrevDictTimeStamp = 0;
137 uint32_t mDictTimeStamp = 0;
141 size_t mAccCTFSize = 0;
142 size_t mCurrCTFSize = 0;
144 size_t mNCTFPrevDict = 0;
146 int mWaitDiskFull = 0;
147 int mWaitDiskFullMax = -1;
148 float mCheckDiskFull = 0.;
149 long mCTFAutoSave = 0;
150 size_t mNCTFFiles = 0;
151 int mMaxCTFPerFile = 0;
153 int mCTFFileCompression = 0;
154 bool mFillMD5 =
false;
155 std::vector<uint32_t> mTFOrbits{};
158 std::string mOutputType{};
159 std::string mDictDir{};
160 std::string mCTFDir{};
161 std::string mHostName{};
162 std::string mCTFDirFallBack =
"/dev/null";
163 std::string mCTFMetaFileDir =
"/dev/null";
164 std::string mCurrentCTFFileName{};
165 std::string mCurrentCTFFileNameFull{};
166 std::string mSizeReport{};
167 std::string mMetaDataType{};
168 const std::string LOCKFileDir =
"/tmp/ctf-writer-locks";
169 std::string mLockFileName{};
171 std::unique_ptr<TFile> mCTFFileOut;
172 std::unique_ptr<TTree> mCTFTreeOut;
174 std::unique_ptr<TFile> mDictFileOut;
175 std::unique_ptr<TTree> mDictTreeOut;
187 static const std::string TMPFileEnding;
190const std::string CTFWriterSpec::TMPFileEnding{
".part"};
194 : mDets(dm), mOutputType(outType), mReportInterval(reportInterval), mVerbosity(
verbosity)
196 std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](
auto& bitset) { bitset.reset(); });
205 auto outmode = mOutputType;
206 if (outmode ==
"ctf") {
209 }
else if (outmode ==
"dict") {
212 }
else if (outmode ==
"both") {
215 }
else if (outmode ==
"none") {
219 throw std::invalid_argument(
"Invalid output-type");
222 mSaveDictAfter = ic.
options().
get<
int>(
"save-dict-after");
223 mCTFAutoSave = ic.
options().
get<
long>(
"save-ctf-after");
224 mCTFFileCompression = ic.
options().
get<
int>(
"ctf-file-compression");
225 mCTFMetaFileDir = ic.
options().
get<std::string>(
"meta-output-dir");
226 if (mCTFMetaFileDir !=
"/dev/null") {
228 mStoreMetaFile =
true;
229 mFillMD5 = ic.
options().
get<
bool>(
"md5-for-meta");
232 mCTFDir = ic.
options().
get<std::string>(
"output-dir");
233 if (mCTFDir !=
"/dev/null") {
237 mStoreMetaFile =
false;
239 mCTFDirFallBack = ic.
options().
get<std::string>(
"output-dir-alt");
240 if (mCTFDirFallBack !=
"/dev/null") {
242 mFallBackDirProvided =
true;
244 mCreateRunEnvDir = !ic.
options().
get<
bool>(
"ignore-partition-run-dir");
245 mMinSize = ic.
options().
get<int64_t>(
"min-file-size");
246 mMaxSize = ic.
options().
get<int64_t>(
"max-file-size");
247 mMaxCTFPerFile = ic.
options().
get<
int>(
"max-ctf-per-file");
248 mRejRate = ic.
options().
get<
int>(
"ctf-rejection");
250 LOGP(info,
"Will reject{} {}% of TFs", mRejRate < 100 ?
" randomly" :
"", mRejRate < 100 ? mRejRate : 100);
251 }
else if (mRejRate < -1) {
252 LOGP(info,
"Will reject all but each {}-th TF slice", -mRejRate);
257 LOG(info) <<
"Multiple CTFs will be accumulated in the tree/file until its size exceeds " << mMinSize <<
" bytes";
258 if (mMaxSize > mMinSize) {
259 LOG(info) <<
"but does not exceed " << mMaxSize <<
" bytes";
264 mCheckDiskFull = ic.
options().
get<
float>(
"require-free-disk");
265 mWaitDiskFull = 1000 * ic.
options().
get<
float>(
"wait-for-free-disk");
266 mWaitDiskFullMax = 1000 * ic.
options().
get<
float>(
"max-wait-for-free-disk");
268 mChkSize = std::max(
size_t(mMinSize * 1.1), mMaxSize);
273 if (std::filesystem::exists(dictFileName)) {
274 throw std::runtime_error(
o2::utils::Str::concat_string(
"CTF dictionary creation is requested but ", dictFileName,
" already exists, remove it!"));
279 char hostname[_POSIX_HOST_NAME_MAX];
280 gethostname(hostname, _POSIX_HOST_NAME_MAX);
281 mHostName = hostname;
282 mHostName = mHostName.substr(0, mHostName.find(
'.'));
293 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.
runType, mDataTakingContext.
forcedRaw);
302 static bool warnedEmpty =
false;
305 mSizeReport += fmt::format(
" {}:N/A", det.
getName());
308 auto ctfBuffer = pc.
inputs().
get<gsl::span<o2::ctf::BufferType>>(det.
getName());
312 throw std::runtime_error(fmt::format(
"Non-empty input was seen at {}-th TF after empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.
getName()));
314 const auto ctfImage = C::getImage(bdata);
316 if (mWriteCTF && !mRejectCurrentTF) {
318 header.detectors.set(det);
320 sz = ctfBuffer.size();
323 if (mFreqsAccumulation[det].
empty()) {
324 mFreqsAccumulation[det].resize(C::getNBlocks());
325 mFreqsMetaData[det].resize(C::getNBlocks());
327 if (!mHeaders[det]) {
328 mHeaders[det] =
ctfImage.cloneHeader();
332 for (
int ib = 0; ib < C::getNBlocks(); ib++) {
333 if (!mIsSaturatedFrequencyTable[det][ib]) {
334 const auto& bl =
ctfImage.getBlock(ib);
336 auto freq = mFreqsAccumulation[det][ib];
337 auto& mdSave = mFreqsMetaData[det][ib];
338 const auto& md =
ctfImage.getMetadata(ib);
341 freq.addFrequencies(bl.getDict(), bl.getDict() + bl.getNDict(), md.min);
342 }
catch (
const std::overflow_error& e) {
343 LOGP(warning,
"unable to add frequency table for {}, block {} due to overflow", det.getName(), ib);
344 mIsSaturatedFrequencyTable[det][ib] =
true;
352 static_cast<int32_t
>(histogramView.getMin()),
353 static_cast<int32_t
>(histogramView.getMax()),
354 static_cast<int32_t
>(histogramView.size()),
356 mFreqsAccumulation[det][ib] = std::move(freq);
365 throw std::runtime_error(fmt::format(
"Empty input was seen at {}-th TF after non-empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
367 LOGP(important,
"Empty CTF provided for {}, skipping and will not report anymore", det.getName());
371 mSizeReport += fmt::format(
" {}:{}", det.getName(), fmt::group_digits(sz));
378void CTFWriterSpec::storeDictionary(
DetID det, CTFHeader& header)
384 auto dictBlocks = C::createDictionaryBlocks(mFreqsAccumulation[det], mFreqsMetaData[det]);
385 auto&
h = C::get(dictBlocks.data())->getHeader();
386 h = *
reinterpret_cast<typename std::remove_reference<decltype(
h)
>::type*>(mHeaders[det].get());
391 auto getFileName = [
this, det, &hb](
bool curr) {
393 curr ?
this->mDictTimeStamp :
this->mPrevDictTimeStamp, curr ?
this->mNCTF :
this->mNCTFPrevDict);
397 auto outName = getFileName(
true);
398 TFile flout(outName.c_str(),
"recreate");
400 flout.WriteObject(&hb, fmt::format(
"ctf_dict_header_{}", det.
getName()).c_str());
402 LOGP(info,
"Saved {} with {} TFs to {}", hb.asString(), mNCTF, outName);
403 if (mPrevDictTimeStamp) {
404 auto outNamePrev = getFileName(
false);
405 if (std::filesystem::exists(outNamePrev)) {
406 std::filesystem::remove(outNamePrev);
407 LOGP(info,
"Removed previous dictionary version {}", outNamePrev);
410 C::get(dictBlocks.data())->appendToTree(*mDictTreeOut.get(), det.
getName());
411 header.detectors.set(det);
431 const std::string NAStr =
"NA";
435 auto cput = mTimer.CpuTime();
437 updateTimeDependentParams(pc);
438 mRejectCurrentTF = (mRejRate > 0 &&
int(gRandom->Rndm() * 100) < mRejRate) || (mRejRate < -1 && mTimingInfo.
timeslice % (-mRejRate));
439 mCurrCTFSize = estimateCTFSize(pc);
440 if (mWriteCTF && !mRejectCurrentTF) {
441 prepareTFTreeAndFile();
443 int totalWait = 0, nwaitCycles = 0;
444 while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) {
445 constexpr size_t MB = 1024 * 1024;
446 constexpr int showFirstN = 10, prsecaleWarnings = 50;
448 const auto si = std::filesystem::space(mCTFFileOut->GetName());
450 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
452 wmsg = fmt::format(
"Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB,
size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
453 }
else if (mCheckDiskFull < 0.f &&
float(si.available) / si.capacity < -mCheckDiskFull) {
455 wmsg = fmt::format(
"Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ?
float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
460 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
461 closeTFTreeAndFile();
462 LOGP(fatal,
"Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
464 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
468 totalWait += mWaitDiskFull;
471 }
catch (std::exception
const& e) {
472 LOG(fatal) <<
"unable to query disk space info for path " << mCurrentCTFFileNameFull <<
", reason: " << e.what();
481 std::array<size_t, DetID::CTP + 1> szCTFperDet{0};
482 szCTFperDet[
DetID::ITS] = processDet<o2::itsmft::CTF>(pc,
DetID::ITS, header, mCTFTreeOut.get());
488 szCTFperDet[
DetID::EMC] = processDet<o2::emcal::CTF>(pc,
DetID::EMC, header, mCTFTreeOut.get());
489 szCTFperDet[
DetID::HMP] = processDet<o2::hmpid::CTF>(pc,
DetID::HMP, header, mCTFTreeOut.get());
490 szCTFperDet[
DetID::MFT] = processDet<o2::itsmft::CTF>(pc,
DetID::MFT, header, mCTFTreeOut.get());
498 szCTF = std::accumulate(szCTFperDet.begin(), szCTFperDet.end(), 0);
499 if (mReportInterval > 0 && (mTimingInfo.
tfCounter % mReportInterval) == 0) {
500 LOGP(important,
"CTF {} size report:{} - Total:{}", mTimingInfo.
tfCounter, mSizeReport, fmt::group_digits(szCTF));
505 if (mWriteCTF && !mRejectCurrentTF) {
506 szCTF +=
appendToTree(*mCTFTreeOut.get(),
"CTFHeader", header);
507 size_t prevSizeMB = mAccCTFSize / (1 << 20);
508 mAccCTFSize += szCTF;
509 mCTFTreeOut->SetEntries(++mNAccCTF);
511 LOG(info) <<
"TF#" << mNCTF <<
": wrote CTF{" << header <<
"} of size " << szCTF <<
" to " << mCurrentCTFFileNameFull <<
" in " << mTimer.CpuTime() - cput <<
" s";
513 LOG(info) <<
"Current CTF tree has " << mNAccCTF <<
" entries with total size of " << mAccCTFSize <<
" bytes";
516 lseek(mLockFD, 0, SEEK_SET);
517 auto nwr = write(mLockFD, &mAccCTFSize,
sizeof(
size_t));
518 if (nwr !=
sizeof(
size_t)) {
519 LOG(error) <<
"Failed to write current CTF size " << mAccCTFSize <<
" to lock file, bytes written: " << nwr;
523 if (mAccCTFSize >= mMinSize || (mMaxCTFPerFile > 0 && mNAccCTF >= mMaxCTFPerFile)) {
524 closeTFTreeAndFile();
525 }
else if ((mCTFAutoSave > 0 && mNAccCTF % mCTFAutoSave == 0) || (mCTFAutoSave < 0 &&
int(prevSizeMB / (-mCTFAutoSave)) !=
size_t(mAccCTFSize / (1 << 20)) / (-mCTFAutoSave))) {
526 mCTFTreeOut->AutoSave(
"override");
529 LOG(info) <<
"TF#" << mNCTF <<
" {" << header <<
"} CTF writing is disabled, size was " << szCTF <<
" bytes";
533 if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
542void CTFWriterSpec::finalize()
551 closeTFTreeAndFile();
553 LOGF(info,
"CTF writing total timing: Cpu: %.3e Real: %.3e s in %d slots",
554 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
561void CTFWriterSpec::prepareTFTreeAndFile()
566 bool needToOpen =
false;
570 if ((mAccCTFSize >= mMinSize) ||
571 (mAccCTFSize && mMaxSize > mMinSize && ((mAccCTFSize + mCurrCTFSize) > mMaxSize))) {
574 LOGP(info,
"Will add new CTF of estimated size {} to existing file of size {}", mCurrCTFSize, mAccCTFSize);
578 closeTFTreeAndFile();
579 mFallBackDirUsed =
false;
581 if (mChkSize > 0 && mFallBackDirProvided) {
583 auto sz = getAvailableDiskSpace(ctfDir, 0);
586 LOG(warning) <<
"Primary CTF output device has available size " << sz <<
" while " << mChkSize <<
" is requested: will write on secondary one";
587 ctfDir = mCTFDirFallBack;
588 mFallBackDirUsed =
true;
592 ctfDir += fmt::format(
"{}_{}/", mDataTakingContext.
envId, mDataTakingContext.
runNumber);
593 if (!ctfDir.empty()) {
595 LOGP(info,
"Created {} directory for CTFs output", ctfDir);
599 mCurrentCTFFileNameFull = fmt::format(
"{}{}", ctfDir, mCurrentCTFFileName);
600 mCTFFileOut.reset(TFile::Open(fmt::format(
"{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(),
"recreate"));
601 if (mCTFFileCompression >= 0) {
602 mCTFFileOut->SetCompressionLevel(mCTFFileCompression);
611void CTFWriterSpec::closeTFTreeAndFile()
616 mCTFTreeOut->Write();
618 mCTFFileOut->Close();
621 auto actualFileName = TMPFileEnding.empty() ? mCurrentCTFFileNameFull :
o2::utils::Str::concat_string(mCurrentCTFFileNameFull, TMPFileEnding);
622 if (mStoreMetaFile) {
624 if (!ctfMetaData.
fillFileData(actualFileName, mFillMD5, TMPFileEnding)) {
625 throw std::runtime_error(
"metadata file was requested but not created");
628 ctfMetaData.
type = mMetaDataType;
629 ctfMetaData.
priority = mFallBackDirUsed ?
"low" :
"high";
630 ctfMetaData.
tfOrbits.swap(mTFOrbits);
631 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
632 auto metaFileName = fmt::format(
"{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
634 std::ofstream metaFileOut(metaFileNameTmp);
635 metaFileOut << ctfMetaData;
637 if (!TMPFileEnding.empty()) {
638 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
640 std::filesystem::rename(metaFileNameTmp, metaFileName);
641 }
catch (std::exception
const& e) {
642 LOG(error) <<
"Failed to store CTF meta data file " << metaFileName <<
", reason: " << e.what();
644 }
else if (!TMPFileEnding.empty()) {
645 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
647 }
catch (std::exception
const& e) {
648 LOG(error) <<
"Failed to finalize CTF file " << mCurrentCTFFileNameFull <<
", reason: " << e.what();
658void CTFWriterSpec::storeDictionaries()
661 mDictTimeStamp = uint32_t(std::time(
nullptr));
662 auto getFileName = [
this](
bool curr) {
665 auto dictFileName = getFileName(
true);
666 mDictFileOut.reset(TFile::Open(dictFileName.c_str(),
"recreate"));
669 CTFHeader header{mTimingInfo.
runNumber, uint32_t(mNCTF)};
670 storeDictionary<o2::itsmft::CTF>(
DetID::ITS, header);
671 storeDictionary<o2::itsmft::CTF>(
DetID::MFT, header);
672 storeDictionary<o2::tpc::CTF>(
DetID::TPC, header);
673 storeDictionary<o2::trd::CTF>(
DetID::TRD, header);
674 storeDictionary<o2::tof::CTF>(
DetID::TOF, header);
675 storeDictionary<o2::ft0::CTF>(
DetID::FT0, header);
676 storeDictionary<o2::fv0::CTF>(
DetID::FV0, header);
677 storeDictionary<o2::fdd::CTF>(
DetID::FDD, header);
678 storeDictionary<o2::mid::CTF>(
DetID::MID, header);
679 storeDictionary<o2::mch::CTF>(
DetID::MCH, header);
680 storeDictionary<o2::emcal::CTF>(
DetID::EMC, header);
681 storeDictionary<o2::phos::CTF>(
DetID::PHS, header);
682 storeDictionary<o2::cpv::CTF>(
DetID::CPV, header);
683 storeDictionary<o2::zdc::CTF>(
DetID::ZDC, header);
684 storeDictionary<o2::hmpid::CTF>(
DetID::HMP, header);
685 storeDictionary<o2::ctp::CTF>(
DetID::CTP, header);
688 mDictTreeOut->SetEntries(1);
689 mDictTreeOut->Write(mDictTreeOut->GetName(), TObject::kSingleKey);
690 mDictTreeOut.reset();
691 mDictFileOut.reset();
693 if (std::filesystem::exists(dictFileNameLnk)) {
694 std::filesystem::remove(dictFileNameLnk);
696 std::filesystem::create_symlink(dictFileName, dictFileNameLnk);
697 LOGP(info,
"Saved CTF dictionaries tree with {} TFs to {} and linked to {}", mNCTF, dictFileName, dictFileNameLnk);
698 if (mPrevDictTimeStamp) {
699 auto dictFileNamePrev = getFileName(
false);
700 if (std::filesystem::exists(dictFileNamePrev)) {
701 std::filesystem::remove(dictFileNamePrev);
702 LOGP(info,
"Removed previous dictionary version {}", dictFileNamePrev);
705 mNCTFPrevDict = mNCTF;
706 mPrevDictTimeStamp = mDictTimeStamp;
710void CTFWriterSpec::createLockFile(
int level)
715 if (!std::filesystem::exists(mLockFileName)) {
719 mLockFD = open(mLockFileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
721 throw std::runtime_error(fmt::format(
"Error opening lock file {}", mLockFileName));
723 if (lockf(mLockFD, F_LOCK, 0)) {
724 throw std::runtime_error(fmt::format(
"Error locking file {}", mLockFileName));
729void CTFWriterSpec::removeLockFile()
733 if (lockf(mLockFD, F_ULOCK, 0)) {
734 throw std::runtime_error(fmt::format(
"Error unlocking file {}", mLockFileName));
738 std::filesystem::remove(mLockFileName, ec);
743size_t CTFWriterSpec::getAvailableDiskSpace(
const std::string&
path,
int level)
746 std::regex pat{fmt::format(
"({}/ctfs{}-[[:alnum:]_]+\\.lock$)", LOCKFileDir,
level)};
750 for (
const auto&
entry :
std::filesystem::directory_iterator(LOCKFileDir)) {
751 const auto& entryName =
entry.path().native();
752 if (std::regex_search(entryName, pat) && (mLockFD < 0 || entryName != mLockFileName)) {
753 int fdt = open(entryName.c_str(), O_RDONLY);
755 bool locked = lockf(fdt, F_TEST, 0) != 0;
759 auto nrd =
read(fdt, &sz,
sizeof(
size_t));
760 if (nrd ==
sizeof(
size_t)) {
769 if (stat(entryName.c_str(), &statbuf) != -1) {
771 auto ftime = statbuf.st_mtimespec.tv_sec;
773 auto ftime = statbuf.st_mtim.tv_sec;
775 auto ctime = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
776 if (ftime + 60 < ctime) {
777 std::filesystem::remove(entryName, ec);
784 const auto si = std::filesystem::space(
path, ec);
785 int64_t avail = int64_t(si.available) - nLocked * mChkSize + written;
786 LOGP(
debug,
"{} CTF files open (curr.size: {}) -> can use {} of {} bytes", nLocked, written, avail, si.available);
787 return avail > 0 ? avail : 0;
793 std::vector<InputSpec> inputs;
805 {
"CTF",
"SIZES", 0, Lifetime::Timeframe}},
808 {
"save-ctf-after", VariantType::Int64, 0ll, {
"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},
809 {
"save-dict-after", VariantType::Int, 0, {
"if > 0, in dictionary generation mode save it dictionary after certain number of TFs processed"}},
810 {
"ctf-dict-dir", VariantType::String,
"none", {
"CTF dictionary directory, must exist"}},
811 {
"output-dir", VariantType::String,
"none", {
"CTF output directory, must exist"}},
812 {
"output-dir-alt", VariantType::String,
"/dev/null", {
"Alternative CTF output directory, must exist (if not /dev/null)"}},
813 {
"meta-output-dir", VariantType::String,
"/dev/null", {
"CTF metadata output directory, must exist (if not /dev/null)"}},
814 {
"md5-for-meta", VariantType::Bool,
false, {
"fill CTF file MD5 sum in the metadata file"}},
815 {
"min-file-size", VariantType::Int64, 0l, {
"accumulate CTFs until given file size reached"}},
816 {
"max-file-size", VariantType::Int64, 0l, {
"if > 0, try to avoid exceeding given file size, also used for space check"}},
817 {
"max-ctf-per-file", VariantType::Int, 0, {
"if > 0, avoid storing more than requested CTFs per file"}},
818 {
"ctf-rejection", VariantType::Int, 0, {
">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
819 {
"ctf-file-compression", VariantType::Int, 0, {
"if >= 0: impose CTF file compression level"}},
820 {
"require-free-disk", VariantType::Float, 0.f, {
"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
821 {
"wait-for-free-disk", VariantType::Float, 10.f, {
"if paused due to the low disk space, recheck after this time (in s)"}},
822 {
"max-wait-for-free-disk", VariantType::Float, 60.f, {
"produce fatal if paused due to the low disk space for more than this amount in s."}},
823 {
"ignore-partition-run-dir", VariantType::Bool,
false, {
"Do not creare partition-run directory in output-dir"}}}};
Header of the AggregatedRunInfo struct.
Definition of the Names Generator class.
Class for time synchronization of RawReader instances.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
static constexpr std::string_view CCDBOBJECT
static constexpr std::string_view CTFDICT
static constexpr std::string_view CTFTREENAME
void init(o2::framework::InitContext &ic) final
bool isPresent(DetID id) const
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr const char * getName(ID id)
names of defined detectors
static std::string getNames(mask_t mask, char delimiter=',')
static constexpr ID First
static constexpr int nDetectors
number of defined detectors
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
functionality to maintain compatibility with previous version of this library
GLint GLint GLsizei GLint GLenum GLenum type
GLsizei const GLchar *const * path
public interface for building and renorming histograms from source data.
uint8_t itsSharedClusterMap uint8_t
constexpr Metadata makeMetadataRansDict(size_t symbolTablePrecision, source_T min, source_T max, size_t dictWords, ctf::Metadata::OptStore optStore) noexcept
uint8_t BufferType
This is the type of the vector to be used for the EncodedBlocks buffer allocation.
size_t appendToTree(TTree &tree, const std::string brname, T &ptr)
framework::DataProcessorSpec getCTFWriterSpec(o2::detectors::DetID::mask_t dets, const std::string &outType, int verbosity, int reportInterval)
create a processor spec
DeliveryType read(const std::string &str)
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
Polygon< T > close(Polygon< T > polygon)
size_t computeRenormingPrecision(size_t nUsedAlphabetSymbols)
auto makeHistogramView(container_T &container, std::ptrdiff_t offset) noexcept -> HistogramView< decltype(std::begin(container))>
HistogramView< Hist_IT > trim(const HistogramView< Hist_IT > &buffer)
size_t countNUsedAlphabetSymbols(const AdaptiveHistogram< source_T > &histogram)
void createDirectoriesIfAbsent(std::string const &path)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
bool globalRunNumberChanged
uint32_t tfCounter
the orbit the TF begins
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))