21#include <fairmq/Device.h>
54#include <TStopwatch.h>
78 auto* br =
tree.GetBranch(brname.c_str());
81 br->SetAddress(&pptr);
83 br =
tree.Branch(brname.c_str(), &pptr);
87 throw std::runtime_error(fmt::format(
"Failed to fill CTF branch {}", brname));
106 void stop() final { finalize(); }
109 static std::string
getBinding(
const std::string&
name,
int spec) {
return fmt::format(
"{}_{}",
name, spec); }
113 template <
typename C>
115 template <
typename C>
117 void storeDictionaries();
118 void closeTFTreeAndFile();
119 void prepareTFTreeAndFile();
121 size_t getAvailableDiskSpace(
const std::string&
path,
int level);
122 void createLockFile(
int level);
123 void removeLockFile();
127 bool mFinalized =
false;
128 bool mWriteCTF =
true;
129 bool mCreateDict =
false;
130 bool mCreateRunEnvDir =
true;
131 bool mStoreMetaFile =
false;
132 bool mRejectCurrentTF =
false;
133 bool mFallBackDirUsed =
false;
134 bool mFallBackDirProvided =
false;
135 int mSaveDictAfter = 0;
136 uint32_t mPrevDictTimeStamp = 0;
137 uint32_t mDictTimeStamp = 0;
141 size_t mAccCTFSize = 0;
142 size_t mCurrCTFSize = 0;
144 size_t mNCTFPrevDict = 0;
146 int mWaitDiskFull = 0;
147 int mWaitDiskFullMax = -1;
148 float mCheckDiskFull = 0.;
149 long mCTFAutoSave = 0;
150 size_t mNCTFFiles = 0;
151 int mMaxCTFPerFile = 0;
153 int mCTFFileCompression = 0;
154 bool mFillMD5 =
false;
155 std::vector<uint32_t> mTFOrbits{};
158 std::string mDictDir{};
159 std::string mCTFDir{};
160 std::string mHostName{};
161 std::string mCTFDirFallBack =
"/dev/null";
162 std::string mCTFMetaFileDir =
"/dev/null";
163 std::string mCurrentCTFFileName{};
164 std::string mCurrentCTFFileNameFull{};
165 std::string mSizeReport{};
166 std::string mMetaDataType{};
167 const std::string LOCKFileDir =
"/tmp/ctf-writer-locks";
168 std::string mLockFileName{};
170 std::unique_ptr<TFile> mCTFFileOut;
171 std::unique_ptr<TTree> mCTFTreeOut;
173 std::unique_ptr<TFile> mDictFileOut;
174 std::unique_ptr<TTree> mDictTreeOut;
186 static const std::string TMPFileEnding;
189const std::string CTFWriterSpec::TMPFileEnding{
".part"};
195 std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](
auto& bitset) { bitset.reset(); });
205 if (outmode ==
"ctf") {
208 }
else if (outmode ==
"dict") {
211 }
else if (outmode ==
"both") {
214 }
else if (outmode ==
"none") {
218 throw std::invalid_argument(
"Invalid output-type");
221 mSaveDictAfter = ic.
options().
get<
int>(
"save-dict-after");
222 mCTFAutoSave = ic.
options().
get<
long>(
"save-ctf-after");
223 mCTFFileCompression = ic.
options().
get<
int>(
"ctf-file-compression");
224 mCTFMetaFileDir = ic.
options().
get<std::string>(
"meta-output-dir");
225 if (mCTFMetaFileDir !=
"/dev/null") {
227 mStoreMetaFile =
true;
228 mFillMD5 = ic.
options().
get<
bool>(
"md5-for-meta");
231 mCTFDir = ic.
options().
get<std::string>(
"output-dir");
232 if (mCTFDir !=
"/dev/null") {
236 mStoreMetaFile =
false;
238 mCTFDirFallBack = ic.
options().
get<std::string>(
"output-dir-alt");
239 if (mCTFDirFallBack !=
"/dev/null") {
241 mFallBackDirProvided =
true;
243 mCreateRunEnvDir = !ic.
options().
get<
bool>(
"ignore-partition-run-dir");
244 mMinSize = ic.
options().
get<int64_t>(
"min-file-size");
245 mMaxSize = ic.
options().
get<int64_t>(
"max-file-size");
246 mMaxCTFPerFile = ic.
options().
get<
int>(
"max-ctf-per-file");
247 mRejRate = ic.
options().
get<
int>(
"ctf-rejection");
249 LOGP(info,
"Will reject{} {}% of TFs", mRejRate < 100 ?
" randomly" :
"", mRejRate < 100 ? mRejRate : 100);
250 }
else if (mRejRate < -1) {
251 LOGP(info,
"Will reject all but each {}-th TF slice", -mRejRate);
256 LOG(info) <<
"Multiple CTFs will be accumulated in the tree/file until its size exceeds " << mMinSize <<
" bytes";
257 if (mMaxSize > mMinSize) {
258 LOG(info) <<
"but does not exceed " << mMaxSize <<
" bytes";
263 mCheckDiskFull = ic.
options().
get<
float>(
"require-free-disk");
264 mWaitDiskFull = 1000 * ic.
options().
get<
float>(
"wait-for-free-disk");
265 mWaitDiskFullMax = 1000 * ic.
options().
get<
float>(
"max-wait-for-free-disk");
267 mChkSize = std::max(
size_t(mMinSize * 1.1), mMaxSize);
272 if (std::filesystem::exists(dictFileName)) {
273 throw std::runtime_error(
o2::utils::Str::concat_string(
"CTF dictionary creation is requested but ", dictFileName,
" already exists, remove it!"));
278 char hostname[_POSIX_HOST_NAME_MAX];
279 gethostname(hostname, _POSIX_HOST_NAME_MAX);
280 mHostName = hostname;
281 mHostName = mHostName.substr(0, mHostName.find(
'.'));
292 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.
runType, mDataTakingContext.
forcedRaw);
301 static bool warnedEmpty =
false;
305 mSizeReport += fmt::format(
" {}:N/A", det.
getName());
315 for (uint32_t iLayer = 0; iLayer <
nLayers; iLayer++) {
321 throw std::runtime_error(fmt::format(
"Non-empty input was seen at {}-th TF after empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.
getName()));
323 const auto ctfImage = C::getImage(bdata);
325 if (mWriteCTF && !mRejectCurrentTF) {
327 header.detectors.set(det);
329 sz += ctfBuffer.size();
332 if (mFreqsAccumulation[det].
empty()) {
333 mFreqsAccumulation[det].resize(C::getNBlocks());
334 mFreqsMetaData[det].resize(C::getNBlocks());
336 if (!mHeaders[det]) {
337 mHeaders[det] =
ctfImage.cloneHeader();
341 for (
int ib = 0; ib < C::getNBlocks(); ib++) {
342 if (!mIsSaturatedFrequencyTable[det][ib]) {
343 const auto& bl =
ctfImage.getBlock(ib);
345 auto freq = mFreqsAccumulation[det][ib];
346 auto& mdSave = mFreqsMetaData[det][ib];
347 const auto& md =
ctfImage.getMetadata(ib);
350 freq.addFrequencies(bl.getDict(), bl.getDict() + bl.getNDict(), md.min);
351 }
catch (
const std::overflow_error& e) {
352 LOGP(warning,
"unable to add frequency table for {}, block {} due to overflow", det.getName(), ib);
353 mIsSaturatedFrequencyTable[det][ib] =
true;
361 static_cast<int32_t
>(histogramView.getMin()),
362 static_cast<int32_t
>(histogramView.getMax()),
363 static_cast<int32_t
>(histogramView.size()),
365 mFreqsAccumulation[det][ib] = std::move(freq);
374 throw std::runtime_error(fmt::format(
"Empty input was seen at {}-th TF after non-empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
376 LOGP(important,
"Empty CTF provided for {}, skipping and will not report anymore", det.getName());
381 mSizeReport += fmt::format(
" {}:{}", det.getName(), fmt::group_digits(sz));
388void CTFWriterSpec::storeDictionary(
DetID det, CTFHeader& header)
394 auto dictBlocks = C::createDictionaryBlocks(mFreqsAccumulation[det], mFreqsMetaData[det]);
395 auto&
h = C::get(dictBlocks.data())->getHeader();
396 h = *
reinterpret_cast<typename std::remove_reference<decltype(
h)
>::type*>(mHeaders[det].get());
401 auto getFileName = [
this, det, &hb](
bool curr) {
403 curr ?
this->mDictTimeStamp :
this->mPrevDictTimeStamp, curr ?
this->mNCTF :
this->mNCTFPrevDict);
407 auto outName = getFileName(
true);
408 TFile flout(outName.c_str(),
"recreate");
410 flout.WriteObject(&hb, fmt::format(
"ctf_dict_header_{}", det.
getName()).c_str());
412 LOGP(info,
"Saved {} with {} TFs to {}", hb.asString(), mNCTF, outName);
413 if (mPrevDictTimeStamp) {
414 auto outNamePrev = getFileName(
false);
415 if (std::filesystem::exists(outNamePrev)) {
416 std::filesystem::remove(outNamePrev);
417 LOGP(info,
"Removed previous dictionary version {}", outNamePrev);
420 C::get(dictBlocks.data())->appendToTree(*mDictTreeOut.get(), det.
getName());
421 header.detectors.set(det);
436 for (uint32_t iLayer = 0; iLayer <
nLayers; iLayer++) {
450 const std::string NAStr =
"NA";
454 auto cput = mTimer.CpuTime();
456 updateTimeDependentParams(pc);
457 mRejectCurrentTF = (mRejRate > 0 &&
int(gRandom->Rndm() * 100) < mRejRate) || (mRejRate < -1 && mTimingInfo.
timeslice % (-mRejRate));
458 mCurrCTFSize = estimateCTFSize(pc);
459 if (mWriteCTF && !mRejectCurrentTF) {
460 prepareTFTreeAndFile();
462 int totalWait = 0, nwaitCycles = 0;
463 while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) {
464 constexpr size_t MB = 1024 * 1024;
465 constexpr int showFirstN = 10, prsecaleWarnings = 50;
467 const auto si = std::filesystem::space(mCTFFileOut->GetName());
469 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
471 wmsg = fmt::format(
"Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB,
size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
472 }
else if (mCheckDiskFull < 0.f &&
float(si.available) / si.capacity < -mCheckDiskFull) {
474 wmsg = fmt::format(
"Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ?
float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
479 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
480 closeTFTreeAndFile();
481 LOGP(fatal,
"Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
483 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
487 totalWait += mWaitDiskFull;
490 }
catch (std::exception
const& e) {
491 LOG(fatal) <<
"unable to query disk space info for path " << mCurrentCTFFileNameFull <<
", reason: " << e.what();
500 std::array<size_t, DetID::CTP + 1> szCTFperDet{0};
501 szCTFperDet[
DetID::ITS] = processDet<o2::itsmft::CTF>(pc,
DetID::ITS, header, mCTFTreeOut.get());
507 szCTFperDet[
DetID::EMC] = processDet<o2::emcal::CTF>(pc,
DetID::EMC, header, mCTFTreeOut.get());
508 szCTFperDet[
DetID::HMP] = processDet<o2::hmpid::CTF>(pc,
DetID::HMP, header, mCTFTreeOut.get());
509 szCTFperDet[
DetID::MFT] = processDet<o2::itsmft::CTF>(pc,
DetID::MFT, header, mCTFTreeOut.get());
517 szCTF = std::accumulate(szCTFperDet.begin(), szCTFperDet.end(), 0);
519 LOGP(important,
"CTF {} size report:{} - Total:{}", mTimingInfo.
tfCounter, mSizeReport, fmt::group_digits(szCTF));
524 if (mWriteCTF && !mRejectCurrentTF) {
525 szCTF +=
appendToTree(*mCTFTreeOut.get(),
"CTFHeader", header);
526 size_t prevSizeMB = mAccCTFSize / (1 << 20);
527 mAccCTFSize += szCTF;
528 mCTFTreeOut->SetEntries(++mNAccCTF);
530 LOG(info) <<
"TF#" << mNCTF <<
": wrote CTF{" << header <<
"} of size " << szCTF <<
" to " << mCurrentCTFFileNameFull <<
" in " << mTimer.CpuTime() - cput <<
" s";
532 LOG(info) <<
"Current CTF tree has " << mNAccCTF <<
" entries with total size of " << mAccCTFSize <<
" bytes";
535 lseek(mLockFD, 0, SEEK_SET);
536 auto nwr = write(mLockFD, &mAccCTFSize,
sizeof(
size_t));
537 if (nwr !=
sizeof(
size_t)) {
538 LOG(error) <<
"Failed to write current CTF size " << mAccCTFSize <<
" to lock file, bytes written: " << nwr;
542 if (mAccCTFSize >= mMinSize || (mMaxCTFPerFile > 0 && mNAccCTF >= mMaxCTFPerFile)) {
543 closeTFTreeAndFile();
544 }
else if ((mCTFAutoSave > 0 && mNAccCTF % mCTFAutoSave == 0) || (mCTFAutoSave < 0 &&
int(prevSizeMB / (-mCTFAutoSave)) !=
size_t(mAccCTFSize / (1 << 20)) / (-mCTFAutoSave))) {
545 mCTFTreeOut->AutoSave(
"override");
548 LOG(info) <<
"TF#" << mNCTF <<
" {" << header <<
"} CTF writing is disabled, size was " << szCTF <<
" bytes";
552 if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
561void CTFWriterSpec::finalize()
570 closeTFTreeAndFile();
572 LOGF(info,
"CTF writing total timing: Cpu: %.3e Real: %.3e s in %d slots",
573 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
580void CTFWriterSpec::prepareTFTreeAndFile()
585 bool needToOpen =
false;
589 if ((mAccCTFSize >= mMinSize) ||
590 (mAccCTFSize && mMaxSize > mMinSize && ((mAccCTFSize + mCurrCTFSize) > mMaxSize))) {
593 LOGP(info,
"Will add new CTF of estimated size {} to existing file of size {}", mCurrCTFSize, mAccCTFSize);
597 closeTFTreeAndFile();
598 mFallBackDirUsed =
false;
600 if (mChkSize > 0 && mFallBackDirProvided) {
602 auto sz = getAvailableDiskSpace(ctfDir, 0);
605 LOG(warning) <<
"Primary CTF output device has available size " << sz <<
" while " << mChkSize <<
" is requested: will write on secondary one";
606 ctfDir = mCTFDirFallBack;
607 mFallBackDirUsed =
true;
611 ctfDir += fmt::format(
"{}_{}/", mDataTakingContext.
envId, mDataTakingContext.
runNumber);
612 if (!ctfDir.empty()) {
614 LOGP(info,
"Created {} directory for CTFs output", ctfDir);
618 mCurrentCTFFileNameFull = fmt::format(
"{}{}", ctfDir, mCurrentCTFFileName);
619 mCTFFileOut.reset(TFile::Open(fmt::format(
"{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(),
"recreate"));
620 if (mCTFFileCompression >= 0) {
621 mCTFFileOut->SetCompressionLevel(mCTFFileCompression);
630void CTFWriterSpec::closeTFTreeAndFile()
635 mCTFTreeOut->Write();
637 mCTFFileOut->Close();
640 auto actualFileName = TMPFileEnding.empty() ? mCurrentCTFFileNameFull :
o2::utils::Str::concat_string(mCurrentCTFFileNameFull, TMPFileEnding);
641 if (mStoreMetaFile) {
643 if (!ctfMetaData.
fillFileData(actualFileName, mFillMD5, TMPFileEnding)) {
644 throw std::runtime_error(
"metadata file was requested but not created");
647 ctfMetaData.
type = mMetaDataType;
648 ctfMetaData.
priority = mFallBackDirUsed ?
"low" :
"high";
649 ctfMetaData.
tfOrbits.swap(mTFOrbits);
650 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
651 auto metaFileName = fmt::format(
"{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
653 std::ofstream metaFileOut(metaFileNameTmp);
654 metaFileOut << ctfMetaData;
656 if (!TMPFileEnding.empty()) {
657 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
659 std::filesystem::rename(metaFileNameTmp, metaFileName);
660 }
catch (std::exception
const& e) {
661 LOG(error) <<
"Failed to store CTF meta data file " << metaFileName <<
", reason: " << e.what();
663 }
else if (!TMPFileEnding.empty()) {
664 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
666 }
catch (std::exception
const& e) {
667 LOG(error) <<
"Failed to finalize CTF file " << mCurrentCTFFileNameFull <<
", reason: " << e.what();
677void CTFWriterSpec::storeDictionaries()
680 mDictTimeStamp = uint32_t(std::time(
nullptr));
681 auto getFileName = [
this](
bool curr) {
684 auto dictFileName = getFileName(
true);
685 mDictFileOut.reset(TFile::Open(dictFileName.c_str(),
"recreate"));
688 CTFHeader header{mTimingInfo.
runNumber, uint32_t(mNCTF)};
689 storeDictionary<o2::itsmft::CTF>(
DetID::ITS, header);
690 storeDictionary<o2::itsmft::CTF>(
DetID::MFT, header);
691 storeDictionary<o2::tpc::CTF>(
DetID::TPC, header);
692 storeDictionary<o2::trd::CTF>(
DetID::TRD, header);
693 storeDictionary<o2::tof::CTF>(
DetID::TOF, header);
694 storeDictionary<o2::ft0::CTF>(
DetID::FT0, header);
695 storeDictionary<o2::fv0::CTF>(
DetID::FV0, header);
696 storeDictionary<o2::fdd::CTF>(
DetID::FDD, header);
697 storeDictionary<o2::mid::CTF>(
DetID::MID, header);
698 storeDictionary<o2::mch::CTF>(
DetID::MCH, header);
699 storeDictionary<o2::emcal::CTF>(
DetID::EMC, header);
700 storeDictionary<o2::phos::CTF>(
DetID::PHS, header);
701 storeDictionary<o2::cpv::CTF>(
DetID::CPV, header);
702 storeDictionary<o2::zdc::CTF>(
DetID::ZDC, header);
703 storeDictionary<o2::hmpid::CTF>(
DetID::HMP, header);
704 storeDictionary<o2::ctp::CTF>(
DetID::CTP, header);
707 mDictTreeOut->SetEntries(1);
708 mDictTreeOut->Write(mDictTreeOut->GetName(), TObject::kSingleKey);
709 mDictTreeOut.reset();
710 mDictFileOut.reset();
712 if (std::filesystem::exists(dictFileNameLnk)) {
713 std::filesystem::remove(dictFileNameLnk);
715 std::filesystem::create_symlink(dictFileName, dictFileNameLnk);
716 LOGP(info,
"Saved CTF dictionaries tree with {} TFs to {} and linked to {}", mNCTF, dictFileName, dictFileNameLnk);
717 if (mPrevDictTimeStamp) {
718 auto dictFileNamePrev = getFileName(
false);
719 if (std::filesystem::exists(dictFileNamePrev)) {
720 std::filesystem::remove(dictFileNamePrev);
721 LOGP(info,
"Removed previous dictionary version {}", dictFileNamePrev);
724 mNCTFPrevDict = mNCTF;
725 mPrevDictTimeStamp = mDictTimeStamp;
729void CTFWriterSpec::createLockFile(
int level)
734 if (!std::filesystem::exists(mLockFileName)) {
738 mLockFD = open(mLockFileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
740 throw std::runtime_error(fmt::format(
"Error opening lock file {}", mLockFileName));
742 if (lockf(mLockFD, F_LOCK, 0)) {
743 throw std::runtime_error(fmt::format(
"Error locking file {}", mLockFileName));
748void CTFWriterSpec::removeLockFile()
752 if (lockf(mLockFD, F_ULOCK, 0)) {
753 throw std::runtime_error(fmt::format(
"Error unlocking file {}", mLockFileName));
757 std::filesystem::remove(mLockFileName, ec);
762size_t CTFWriterSpec::getAvailableDiskSpace(
const std::string&
path,
int level)
765 std::regex pat{fmt::format(
"({}/ctfs{}-[[:alnum:]_]+\\.lock$)", LOCKFileDir,
level)};
769 for (
const auto&
entry :
std::filesystem::directory_iterator(LOCKFileDir)) {
770 const auto& entryName =
entry.path().native();
771 if (std::regex_search(entryName, pat) && (mLockFD < 0 || entryName != mLockFileName)) {
772 int fdt = open(entryName.c_str(), O_RDONLY);
774 bool locked = lockf(fdt, F_TEST, 0) != 0;
778 auto nrd =
read(fdt, &sz,
sizeof(
size_t));
779 if (nrd ==
sizeof(
size_t)) {
788 if (stat(entryName.c_str(), &statbuf) != -1) {
790 auto ftime = statbuf.st_mtimespec.tv_sec;
792 auto ftime = statbuf.st_mtim.tv_sec;
794 auto ctime = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
795 if (ftime + 60 < ctime) {
796 std::filesystem::remove(entryName, ec);
803 const auto si = std::filesystem::space(
path, ec);
804 int64_t avail =
int64_t(si.available) - nLocked * mChkSize + written;
805 LOGP(
debug,
"{} CTF files open (curr.size: {}) -> can use {} of {} bytes", nLocked, written, avail, si.available);
806 return avail > 0 ? avail : 0;
812 std::vector<InputSpec> inputs;
816 uint32_t nLayers = 1;
823 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
833 {
"CTF",
"SIZES", 0, Lifetime::Timeframe}},
837 {
"save-ctf-after", VariantType::Int64, 0ll, {
"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},
838 {
"save-dict-after", VariantType::Int, 0, {
"if > 0, in dictionary generation mode save it dictionary after certain number of TFs processed"}},
839 {
"ctf-dict-dir", VariantType::String,
"none", {
"CTF dictionary directory, must exist"}},
840 {
"output-dir", VariantType::String,
"none", {
"CTF output directory, must exist"}},
841 {
"output-dir-alt", VariantType::String,
"/dev/null", {
"Alternative CTF output directory, must exist (if not /dev/null)"}},
842 {
"meta-output-dir", VariantType::String,
"/dev/null", {
"CTF metadata output directory, must exist (if not /dev/null)"}},
843 {
"md5-for-meta", VariantType::Bool,
false, {
"fill CTF file MD5 sum in the metadata file"}},
844 {
"min-file-size", VariantType::Int64, 0l, {
"accumulate CTFs until given file size reached"}},
845 {
"max-file-size", VariantType::Int64, 0l, {
"if > 0, try to avoid exceeding given file size, also used for space check"}},
846 {
"max-ctf-per-file", VariantType::Int, 0, {
"if > 0, avoid storing more than requested CTFs per file"}},
847 {
"ctf-rejection", VariantType::Int, 0, {
">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
848 {
"ctf-file-compression", VariantType::Int, 0, {
"if >= 0: impose CTF file compression level"}},
849 {
"require-free-disk", VariantType::Float, 0.f, {
"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
850 {
"wait-for-free-disk", VariantType::Float, 10.f, {
"if paused due to the low disk space, recheck after this time (in s)"}},
851 {
"max-wait-for-free-disk", VariantType::Float, 60.f, {
"produce fatal if paused due to the low disk space for more than this amount in s."}},
852 {
"ignore-partition-run-dir", VariantType::Bool,
false, {
"Do not creare partition-run directory in output-dir"}}}};
std::string getName(const TDataMember *dm, int index, int size)
Header of the AggregatedRunInfo struct.
Definition of the Names Generator class.
Class for time synchronization of RawReader instances.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
static constexpr std::string_view CCDBOBJECT
static constexpr std::string_view CTFDICT
static constexpr std::string_view CTFTREENAME
void init(o2::framework::InitContext &ic) final
bool isPresent(DetID id) const
static std::string getBinding(const std::string &name, int spec)
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr const char * getName(ID id)
names of defined detectors
static std::string getNames(mask_t mask, char delimiter=',')
static constexpr ID First
static constexpr int nDetectors
number of defined detectors
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
functionality to maintain compatibility with previous version of this library
GLuint const GLchar * name
GLint GLint GLsizei GLint GLenum GLenum type
GLsizei const GLchar *const * path
public interface for building and renorming histograms from source data.
uint8_t itsSharedClusterMap uint8_t
constexpr Metadata makeMetadataRansDict(size_t symbolTablePrecision, source_T min, source_T max, size_t dictWords, ctf::Metadata::OptStore optStore) noexcept
framework::DataProcessorSpec getCTFWriterSpec(const o2::ctf::CTFWriterInp &inp)
create a processor spec
uint8_t BufferType
This is the type of the vector to be used for the EncodedBlocks buffer allocation.
size_t appendToTree(TTree &tree, const std::string brname, T &ptr)
DeliveryType read(const std::string &str)
Defining ITS Vertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
Polygon< T > close(Polygon< T > polygon)
size_t computeRenormingPrecision(size_t nUsedAlphabetSymbols)
auto makeHistogramView(container_T &container, std::ptrdiff_t offset) noexcept -> HistogramView< decltype(std::begin(container))>
HistogramView< Hist_IT > trim(const HistogramView< Hist_IT > &buffer)
size_t countNUsedAlphabetSymbols(const AdaptiveHistogram< source_T > &histogram)
void createDirectoriesIfAbsent(std::string const &path)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
o2::detectors::DetID::mask_t detMask
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
bool globalRunNumberChanged
uint32_t tfCounter
the orbit the TF begins
static constexpr int getNLayers()
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))