41using ios = std::ios_base;
48 RawTFDump(
const std::string& trigger);
57 size_t getTFSizeInFile()
const;
58 size_t getCurrentFileSize();
62 std::string reportRates()
const;
64 SubTimeFrameFileDataIndex mTFDataIndex;
65 std::vector<std::pair<size_t, const void*>> mTFData;
66 std::map<EquipmentIdentifier, std::tuple<size_t, size_t, size_t, const void*>> mDataMap;
67 std::vector<InputSpec> mFilter{};
68 std::vector<InputSpec> mTriggerFilter{};
69 std::vector<InputSpec> mExclTriggerFilter{};
72 size_t mMinFileSize = 0;
73 size_t mMaxFileSize = 0;
77 int mNTFsAccepted = 0;
81 int mMaxTFPerFile = 0;
82 int mNWarnThrottle = 0;
83 int mMaxWarnThrottle = 0;
84 int mWarnThrottleTF = 0;
85 int mWaitDiskFull = 0;
86 int mWaitDiskFullMax = -1;
87 float mCheckDiskFull = 0.;
88 float mMaxAccRate = 0.f;
89 float mConfLim = 0.05f;
90 float mRateEstAccLow = 0.f;
91 float mRateEstAccUpp = 0.f;
92 float mRateEstTrgLow = 0.f;
93 float mRateEstTrgUpp = 0.f;
95 bool mFillMD5 =
false;
97 bool mStoreMetaFile =
false;
98 bool mCreateRunEnvDir =
true;
99 bool mAcceptCurrentTF =
false;
100 bool mRejectDEADBEEF =
false;
101 bool mRejectDistSTF =
true;
103 std::vector<uint32_t> mTFOrbits{};
107 std::string mTrigger{};
108 std::string mExclTriggerSpecs{};
109 std::string mHostName{};
110 std::string mTFDir{};
111 std::string mTFMetaFileDir =
"/dev/null";
112 std::string mCurrentTFFileName{};
113 std::string mCurrentTFFileNameFull{};
114 std::string mCurrentTFFileNameFullTmp{};
115 std::string mMetaDataType{};
117 static constexpr size_t MiB = 1ul << 20;
118 static constexpr std::streamsize sBuffSize = MiB;
119 static constexpr std::streamsize sChunkSize = 512;
120 static const std::string TMPFileEnding;
121 std::unique_ptr<char[]> mFileBuf;
123 std::uniform_real_distribution<double> mUniformDist{0.0, 100.0};
124 std::default_random_engine mRGen;
129 typename std::enable_if<
130 std::is_pointer<pointer>::value &&
131 (std::is_void<std::remove_pointer_t<pointer>>
::value ||
132 std::is_standard_layout<std::remove_pointer_t<pointer>>
::value)>
::type* =
nullptr>
133 void buffered_write(
const pointer p, std::streamsize pCount)
136 assert((pCount %
sizeof(std::conditional_t<std::is_void<std::remove_pointer_t<pointer>>
::value,
137 char, std::remove_pointer_t<pointer>>) ==
139 "Performing short write?");
141 const char* lPtr =
reinterpret_cast<const char*
>(
p);
143 if (pCount >= sBuffSize) {
144 mFile.write(lPtr, pCount);
148 const auto lToWrite = std::min(pCount, sChunkSize);
149 assert(lToWrite > 0 && lToWrite <= sChunkSize && lToWrite <= pCount);
151 mFile.write(lPtr, lToWrite);
159const std::string RawTFDump::TMPFileEnding{
".part"};
164 mTriggerFilter =
select(trigger.c_str());
165 mFileBuf = std::make_unique<char[]>(sBuffSize);
166 mFile.rdbuf()->pubsetbuf(mFileBuf.get(), sBuffSize);
168 mFile.exceptions(std::fstream::failbit | std::fstream::badbit);
174 mRGen = std::default_random_engine(getpid());
175 mTFMetaFileDir = ic.
options().
get<std::string>(
"meta-output-dir");
176 if (mTFMetaFileDir !=
"/dev/null") {
178 mStoreMetaFile =
true;
179 mFillMD5 = ic.
options().
get<
bool>(
"md5-for-meta");
182 mTFDir = ic.
options().
get<std::string>(
"output-dir");
183 if (mTFDir !=
"/dev/null") {
188 mStoreMetaFile =
false;
190 mRejectDistSTF = !ic.
options().
get<
bool>(
"include-dist-stf");
191 mRejectDEADBEEF = !ic.
options().
get<
bool>(
"include-deadbeef");
192 mCreateRunEnvDir = !ic.
options().
get<
bool>(
"ignore-partition-run-dir");
193 mMinFileSize = ic.
options().
get<int64_t>(
"min-file-size");
194 mMaxFileSize = ic.
options().
get<int64_t>(
"max-file-size");
195 mMaxTFPerFile = ic.
options().
get<
int>(
"max-tf-per-file");
196 mMaxAccRate = ic.
options().
get<
float>(
"max-dump-rate");
197 float cl = ic.
options().
get<
float>(
"rate-est-conf-limit");
198 if (mConfLim < 0.001 || mConfLim > 0.32) {
199 LOGP(warn,
"Bad confidence limit {} for rate estimate, setting to default {}", cl, mConfLim);
203 mMaxWarnThrottle = ic.
options().
get<
int>(
"max-warn");
204 mWarnThrottleTF = ic.
options().
get<
int>(
"mute-warn-period");
206 mVerbose = ic.
options().
get<
int>(
"verbosity-level");
207 mExclTriggerSpecs = ic.
options().
get<std::string>(
"exclude-trigger-specs");
208 if (!mExclTriggerSpecs.empty()) {
209 mExclTriggerFilter =
select(mExclTriggerSpecs.c_str());
211 if (mTrigger.empty()) {
212 if (mMaxAccRate >= 0.f) {
213 LOGP(info,
"Will accept randomly {}% of TFs", mMaxAccRate);
215 LOGP(info,
"Will accept every {}-th TF",
int(std::ceil(-100.f / mMaxAccRate)));
218 mMaxAccRate = std::abs(mMaxAccRate);
219 LOGP(info,
"Will limit TFs triggered with {} by {}% at most", mTrigger, mMaxAccRate);
220 if (!mExclTriggerFilter.empty()) {
221 LOGP(info,
"Inputs excluded from the trigger: {}", mExclTriggerSpecs);
226 if (mMinFileSize > 0) {
227 LOGP(info,
"Multiple TFs will be accumulated in the file until its size exceeds {}{}",
228 mMinFileSize, mMaxFileSize > mMinFileSize ? fmt::format(
" but does not exceed {} B", mMaxFileSize) : std::string{});
232 mCheckDiskFull = ic.
options().
get<
float>(
"require-free-disk");
233 mWaitDiskFull = 1000 * ic.
options().
get<
float>(
"wait-for-free-disk");
234 mWaitDiskFullMax = 1000 * ic.
options().
get<
float>(
"max-wait-for-free-disk");
236 char hostname[_POSIX_HOST_NAME_MAX];
237 gethostname(hostname, _POSIX_HOST_NAME_MAX);
238 mHostName = hostname;
239 mHostName = mHostName.substr(0, mHostName.find(
'.'));
246 updateTimeDependentParams(pc);
247 mAcceptCurrentTF = triggerTF(pc);
248 if (mAcceptCurrentTF) {
249 prepareTFForWriting(pc);
255 if (mWriteTF && checkFreeSpace(pc)) {
257 size_t lTFSizeInFile = getTFSizeInFile();
261 mFile << lTFFileMeta;
262 mFile << mTFDataIndex;
264 for (
const auto& eqEntry : mDataMap) {
265 auto& [lSize, lCnt, lEntry, lHeader] = eqEntry.second;
266 for (
size_t part = 0; part < lCnt; part++) {
267 const auto& dataPtr = mTFData[lEntry + part];
273 LOGP(info,
"Writing part:{}/{} of {} | TFCounter:{} part{}/{}, size:{}", part, lCnt,
DataSpecUtils::describe(
OutputSpec{hdToWrite.
dataOrigin, hdToWrite.
dataDescription, hdToWrite.
subSpecification}), hdToWrite.
firstTForbit, hdToWrite.
splitPayloadIndex, hdToWrite.
splitPayloadParts, hdToWrite.
payloadSize);
275 buffered_write(
reinterpret_cast<const char*
>(&hdToWrite),
sizeof(
DataHeader));
276 buffered_write(dataPtr.second, hdToWrite.
payloadSize);
282 }
catch (
const std::ios_base::failure& eFailExc) {
283 LOGP(error,
"Writing of TF {} to file {} failed. error={}", mTimingInfo.
tfCounter, mCurrentTFFileNameFullTmp, eFailExc.what());
289 mTFDataIndex.clear();
297 LOGP(info,
"Dumped {} TFs to {} files", mNTFsAccepted, mNTFFiles);
298 if (!mTriggerFilter.empty()) {
299 LOGP(info,
"External trigger summary: {}", reportRates());
304size_t RawTFDump::getTFSizeInFile()
const
310size_t RawTFDump::getCurrentFileSize()
312 return mFile.is_open() ? size_t(mFile.tellp()) : 0;
316void RawTFDump::prepareTFFile()
322 if (!mFile.is_open()) {
325 auto currSize = getCurrentFileSize();
326 if ((mNTFsInFile >= mMaxTFPerFile) ||
327 (currSize >= mMinFileSize) ||
328 (currSize && mMaxFileSize > mMinFileSize && ((currSize + mTFSize) > mMaxFileSize))) {
331 LOGP(info,
"Will add new TF of size {} to existing file of size {} with {} TFs", mTFSize, currSize, mNTFsInFile);
339 TFDir += fmt::format(
"{}_{}tf/", mDataTakingContext.
envId, mDataTakingContext.
runNumber);
340 if (!TFDir.empty()) {
342 LOGP(info,
"Created {} directory for TFs output", TFDir);
346 mCurrentTFFileNameFull = fmt::format(
"{}{}", TFDir, mCurrentTFFileName);
347 mCurrentTFFileNameFullTmp = TMPFileEnding.empty() ? mCurrentTFFileNameFull :
o2::utils::Str::concat_string(mCurrentTFFileNameFull, TMPFileEnding);
348 mFile.open(mCurrentTFFileNameFullTmp.c_str(), ios::binary | ios::trunc | ios::out | ios::ate);
349 LOGP(info,
"Opened new raw-tf dump file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
362 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.
runType, mDataTakingContext.
forcedRaw);
367void RawTFDump::closeTFFile()
369 if (!mFile.is_open()) {
373 LOGP(info,
"Closing output file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
376 if (mStoreMetaFile) {
378 if (!TFMetaData.
fillFileData(mCurrentTFFileNameFullTmp, mFillMD5, TMPFileEnding)) {
379 throw std::runtime_error(
"metadata file was requested but not created");
382 TFMetaData.
type = mMetaDataType;
384 TFMetaData.
tfOrbits.swap(mTFOrbits);
385 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mTFMetaFileDir, mCurrentTFFileName);
386 auto metaFileName = fmt::format(
"{}{}.done", mTFMetaFileDir, mCurrentTFFileName);
388 std::ofstream metaFileOut(metaFileNameTmp);
389 metaFileOut << TFMetaData;
391 if (!TMPFileEnding.empty()) {
392 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
394 std::filesystem::rename(metaFileNameTmp, metaFileName);
395 LOGP(info,
"wrote meta file {}", metaFileName);
396 }
catch (std::exception
const& e) {
397 LOGP(error,
"Failed to store TF meta data file {}, reason {}", metaFileName, e.what());
399 }
else if (!TMPFileEnding.empty()) {
400 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
402 }
catch (std::exception
const& e) {
403 LOGP(error,
"Failed to finalize TF file {}, reason: ", mCurrentTFFileNameFull, e.what());
412 int totalWait = 0, nwaitCycles = 0;
413 while (mCheckDiskFull) {
414 constexpr int showFirstN = 10, prsecaleWarnings = 50;
416 const auto si = std::filesystem::space(mCurrentTFFileNameFullTmp);
418 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
420 wmsg = fmt::format(
"Disk has {} MiB available while at least {} MiB is requested, wait for {} ms (on top of {} ms)", si.available / MiB,
size_t(mCheckDiskFull) / MiB, mWaitDiskFull, totalWait);
421 }
else if (mCheckDiskFull < 0.f &&
float(si.available) / si.capacity < -mCheckDiskFull) {
423 wmsg = fmt::format(
"Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ?
float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
428 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
430 LOGP(fatal,
"Disk has {} MiB available out of {} MiB after waiting for {} ms", si.available / MiB, si.capacity / MiB, mWaitDiskFullMax);
432 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
433 LOGP(alarm,
"{}", wmsg);
436 totalWait += mWaitDiskFull;
439 }
catch (std::exception
const& e) {
440 LOGP(fatal,
"unable to query disk space info for path {}, reason {}", mCurrentTFFileNameFull, e.what());
451 if (mTrigger.empty()) {
452 if (mMaxAccRate > 0.f) {
453 trig = (mUniformDist(mRGen) <= mMaxAccRate);
454 }
else if (mMaxAccRate < 0.f) {
455 trig = (mTimingInfo.
tfCounter %
int(std::ceil(-100.f / mMaxAccRate))) == 0;
459 auto const* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
461 LOGP(error,
"Failed to extract header for trigger input");
464 auto extTrig = DataRefUtils::as<bool>(
ref);
465 if (mVerbose > 1 || (mVerbose > 0 && extTrig.size() > 0 && extTrig[0])) {
466 LOGP(info,
"trigger input {}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {} | span size: {} span[0]={}",
468 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, extTrig.size(), extTrig.size() > 0 ? extTrig[0] : false);
470 if (extTrig.size() && extTrig[0]) {
473 for (
const auto& excl : mExclTriggerFilter) {
491 mRateEstTrgLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsExtTrig)) / (2 * mNTFsSeen);
492 mRateEstTrgUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsExtTrig + 1)) / (2 * mNTFsSeen);
493 mRateEstAccLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsAccepted)) / (2 * mNTFsSeen);
494 mRateEstAccUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsAccepted + 1)) / (2 * mNTFsSeen);
495 if (mRateEstAccLow > 0.01 * mMaxAccRate) {
498 if ((mNTFsSeen - mLastWarned) > mWarnThrottleTF && ((mNWarnThrottle < mMaxWarnThrottle) || mMaxWarnThrottle < 0)) {
499 mLastWarned = mNTFsSeen;
500 std::string swarn = reportRates();
501 if (++mNWarnThrottle == mMaxWarnThrottle) {
502 swarn +=
" Will not warn anymore.";
504 swarn += fmt::format(
" Will suppress this warnings for {} TFs", mWarnThrottleTF);
506 LOGP(alarm,
"Ignoring TF triggered for dumping: {}", swarn);
515 LOGP(info,
"TF#{} (slice#{}) will{} be written, {}", mTimingInfo.
tfCounter, mTimingInfo.
timeslice, trig ?
"" :
" not", reportRates());
524 auto const* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
526 LOGP(error,
"Failed to extract header");
529 if ((dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) ||
537 const auto lHdrDataSize =
sizeof(
DataHeader) + payloadSize;
538 mTFSize += lHdrDataSize;
540 auto& [lSize, lCnt, lEntry, lHeader] = mDataMap[EquipmentIdentifier(*dh)];
542 lEntry = mTFData.size();
543 lHeader =
ref.header;
545 lSize += lHdrDataSize;
547 mTFData.push_back({payloadSize,
ref.payload});
549 const auto* dph = DataRefUtils::getHeader<DataProcessingHeader*>(
ref);
550 LOGP(info,
"{}, part: {}({}) of {}, payload {}({}), 1stTFOrbit: {} TF: {}, creation: {} | counter:{} size:{} entry:{}",
552 dh->splitPayloadIndex, lCnt - 1, dh->splitPayloadParts, dh->payloadSize, payloadSize, dh->firstTForbit, dh->tfCounter, dph ? dph->creation : -1UL, lCnt, lSize, lEntry);
561 LOGP(info,
"Creating dump image for TF {} of run {}, starting orbit {}, size = {}", mTimingInfo.
tfCounter, mTimingInfo.
runNumber, mTimingInfo.
firstTForbit, mTFSize);
562 std::uint64_t lCurrOff = 0;
563 for (
const auto& eqEntry : mDataMap) {
564 const auto& eq = eqEntry.first;
565 auto& [lSize, lCnt, lEntry, lHeader] = eqEntry.second;
568 OutputSpec spec{eq.mDataOrigin, eq.mDataDescription, eq.mSubSpecification};
570 LOGP(info,
"{} : {} parts of size {} entry {}| offset: {}",
DataSpecUtils::describe(spec), lCnt, lSize, lEntry, lCurrOff);
572 mTFDataIndex.AddStfElement(eq, lCnt, lCurrOff, lSize);
579std::string RawTFDump::reportRates()
const
581 std::string rep = fmt::format(
"{} TFs seen, {} accepted", mNTFsSeen, mNTFsAccepted);
582 if (!mTrigger.empty()) {
583 rep += fmt::format(
", {} ext.triggered, est.rate: [{:.2e}:{:.2e}]/[{:.2e}:{:.2e}].", mNTFsExtTrig, mRateEstAccLow, mRateEstAccUpp, mRateEstTrgLow, mRateEstTrgUpp);
591 std::vector<InputSpec> inputs =
select(inpconfig.c_str());
598 {
"include-deadbeef", VariantType::Bool,
false, {
"Include DPL-generated 0xdeadbeef subspecs for missing data"}},
599 {
"include-dist-stf", VariantType::Bool,
false, {
"Include FLP/DISTSUBTIMEFRAME input"}},
600 {
"exclude-trigger-specs", VariantType::String,
"", {
"Ignore trigger seen in these inputs of triggerspec"}},
601 {
"max-dump-rate", VariantType::Float, 0.f, {
"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}},
602 {
"rate-est-conf-limit", VariantType::Float, 0.05f, {
"quantile for the lowest rate estimate confidence limit"}},
603 {
"max-warn", VariantType::Int, 5, {
"max allowed warnings on throttling"}},
604 {
"mute-warn-period", VariantType::Int, 100, {
"mute warnings on throttling for this number of TFs"}},
605 {
"output-dir", VariantType::String,
"none", {
"TF output directory, must exist"}},
606 {
"meta-output-dir", VariantType::String,
"/dev/null", {
"TF metadata output directory, must exist (if not /dev/null)"}},
607 {
"md5-for-meta", VariantType::Bool,
false, {
"fill CTF file MD5 sum in the metadata file"}},
608 {
"min-file-size", VariantType::Int64, 0l, {
"accumulate TFs until given file size reached"}},
609 {
"max-file-size", VariantType::Int64, 0l, {
"if > 0, try to avoid exceeding given file size, also used for space check"}},
610 {
"max-tf-per-file", VariantType::Int, 0, {
"if > 0, avoid storing more than requested CTFs per file"}},
611 {
"require-free-disk", VariantType::Float, 0.f, {
"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
612 {
"wait-for-free-disk", VariantType::Float, 10.f, {
"if paused due to the low disk space, recheck after this time (in s)"}},
613 {
"max-wait-for-free-disk", VariantType::Float, 60.f, {
"produce fatal if paused due to the low disk space for more than this amount in s."}},
614 {
"verbosity-level", VariantType::Int, 0, {
"Verbose mode: 1: decision on every TF, 2: details of saved TF, 3: more details"}},
615 {
"ignore-partition-run-dir", VariantType::Bool,
false, {
"Do not creare partition-run directory in output-dir"}}}};
Header of the AggregatedRunInfo struct.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static std::string getRawTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_rawtf_dump")
Static class with identifiers, bitmasks and names for ALICE detectors.
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr o2h::DataDescription DESCCRaw
RawTFDump(const std::string &trigger)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(InitContext &ic) final
static constexpr o2h::DataDescription DESCRaw
void run(ProcessingContext &pc) final
GLsizei const GLfloat * value
GLint GLint GLsizei GLint GLenum GLenum type
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Defining ITS Vertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
DataProcessorSpec getRawTFDumpSpec(const std::string &inpconfig, const std::string &trigger)
void createDirectoriesIfAbsent(std::string const &path)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool match(DataRef const &ref, const char *binding)
static std::string describe(InputSpec const &spec)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
bool globalRunNumberChanged
uint32_t tfCounter
the orbit the TF begins
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)