40using ios = std::ios_base;
47 RawTFDump(
const std::string& trigger);
56 size_t getTFSizeInFile()
const;
57 size_t getCurrentFileSize();
61 std::string reportRates()
const;
63 SubTimeFrameFileDataIndex mTFDataIndex;
64 std::vector<std::pair<const void*, const void*>> mTFData;
65 std::map<EquipmentIdentifier, std::tuple<size_t, size_t, size_t>> mDataMap;
66 std::vector<InputSpec> mFilter{};
67 std::vector<InputSpec> mTriggerFilter{};
68 std::vector<InputSpec> mExclTriggerFilter{};
71 size_t mMinFileSize = 0;
72 size_t mMaxFileSize = 0;
76 int mNTFsAccepted = 0;
80 int mMaxTFPerFile = 0;
81 int mNWarnThrottle = 0;
82 int mMaxWarnThrottle = 0;
83 int mWarnThrottleTF = 0;
84 int mWaitDiskFull = 0;
85 int mWaitDiskFullMax = -1;
86 float mCheckDiskFull = 0.;
87 float mMaxAccRate = 0.f;
88 float mConfLim = 0.05f;
89 float mRateEstAccLow = 0.f;
90 float mRateEstAccUpp = 0.f;
91 float mRateEstTrgLow = 0.f;
92 float mRateEstTrgUpp = 0.f;
94 bool mFillMD5 =
false;
96 bool mStoreMetaFile =
false;
97 bool mCreateRunEnvDir =
true;
98 bool mAcceptCurrentTF =
false;
99 bool mRejectDEADBEEF =
false;
100 bool mRejectDistSTF =
true;
102 std::vector<uint32_t> mTFOrbits{};
106 std::string mTrigger{};
107 std::string mExclTriggerSpecs{};
108 std::string mHostName{};
109 std::string mTFDir{};
110 std::string mTFMetaFileDir =
"/dev/null";
111 std::string mCurrentTFFileName{};
112 std::string mCurrentTFFileNameFull{};
113 std::string mCurrentTFFileNameFullTmp{};
114 std::string mMetaDataType{};
116 static constexpr size_t MiB = 1ul << 20;
117 static constexpr std::streamsize sBuffSize = MiB;
118 static constexpr std::streamsize sChunkSize = 512;
119 static const std::string TMPFileEnding;
120 std::unique_ptr<char[]> mFileBuf;
122 std::uniform_real_distribution<double> mUniformDist{0.0, 100.0};
123 std::default_random_engine mRGen;
128 typename std::enable_if<
129 std::is_pointer<pointer>::value &&
130 (std::is_void<std::remove_pointer_t<pointer>>
::value ||
131 std::is_standard_layout<std::remove_pointer_t<pointer>>
::value)>
::type* =
nullptr>
132 void buffered_write(
const pointer p, std::streamsize pCount)
135 assert((pCount %
sizeof(std::conditional_t<std::is_void<std::remove_pointer_t<pointer>>
::value,
136 char, std::remove_pointer_t<pointer>>) ==
138 "Performing short write?");
140 const char* lPtr =
reinterpret_cast<const char*
>(
p);
142 if (pCount >= sBuffSize) {
143 mFile.write(lPtr, pCount);
147 const auto lToWrite = std::min(pCount, sChunkSize);
148 assert(lToWrite > 0 && lToWrite <= sChunkSize && lToWrite <= pCount);
150 mFile.write(lPtr, lToWrite);
158const std::string RawTFDump::TMPFileEnding{
".part"};
163 mTriggerFilter =
select(trigger.c_str());
164 mFileBuf = std::make_unique<char[]>(sBuffSize);
165 mFile.rdbuf()->pubsetbuf(mFileBuf.get(), sBuffSize);
167 mFile.exceptions(std::fstream::failbit | std::fstream::badbit);
173 mRGen = std::default_random_engine(getpid());
174 mTFMetaFileDir = ic.
options().
get<std::string>(
"meta-output-dir");
175 if (mTFMetaFileDir !=
"/dev/null") {
177 mStoreMetaFile =
true;
178 mFillMD5 = ic.
options().
get<
bool>(
"md5-for-meta");
181 mTFDir = ic.
options().
get<std::string>(
"output-dir");
182 if (mTFDir !=
"/dev/null") {
187 mStoreMetaFile =
false;
189 mRejectDistSTF = !ic.
options().
get<
bool>(
"include-dist-stf");
190 mRejectDEADBEEF = !ic.
options().
get<
bool>(
"include-deadbeef");
191 mCreateRunEnvDir = !ic.
options().
get<
bool>(
"ignore-partition-run-dir");
192 mMinFileSize = ic.
options().
get<int64_t>(
"min-file-size");
193 mMaxFileSize = ic.
options().
get<int64_t>(
"max-file-size");
194 mMaxTFPerFile = ic.
options().
get<
int>(
"max-tf-per-file");
195 mMaxAccRate = ic.
options().
get<
float>(
"max-dump-rate");
196 float cl = ic.
options().
get<
float>(
"rate-est-conf-limit");
197 if (mConfLim < 0.001 || mConfLim > 0.32) {
198 LOGP(warn,
"Bad confidence limit {} for rate estimate, setting to default {}", cl, mConfLim);
202 mMaxWarnThrottle = ic.
options().
get<
int>(
"max-warn");
203 mWarnThrottleTF = ic.
options().
get<
int>(
"mute-warn-period");
205 mVerbose = ic.
options().
get<
int>(
"verbosity-level");
206 mExclTriggerSpecs = ic.
options().
get<std::string>(
"exclude-trigger-specs");
207 if (!mExclTriggerSpecs.empty()) {
208 mExclTriggerFilter =
select(mExclTriggerSpecs.c_str());
210 if (mTrigger.empty()) {
211 if (mMaxAccRate >= 0.f) {
212 LOGP(info,
"Will accept randomly {}% of TFs", mMaxAccRate);
214 LOGP(info,
"Will accept every {}-th TF",
int(std::ceil(-100.f / mMaxAccRate)));
217 mMaxAccRate = std::abs(mMaxAccRate);
218 LOGP(info,
"Will limit TFs triggered with {} by {}% at most", mTrigger, mMaxAccRate);
219 if (!mExclTriggerFilter.empty()) {
220 LOGP(info,
"Inputs excluded from the trigger: {}", mExclTriggerSpecs);
225 if (mMinFileSize > 0) {
226 LOGP(info,
"Multiple TFs will be accumulated in the file until its size exceeds {}{}",
227 mMinFileSize, mMaxFileSize > mMinFileSize ? fmt::format(
" but does not exceed {} B", mMaxFileSize) : std::string{});
231 mCheckDiskFull = ic.
options().
get<
float>(
"require-free-disk");
232 mWaitDiskFull = 1000 * ic.
options().
get<
float>(
"wait-for-free-disk");
233 mWaitDiskFullMax = 1000 * ic.
options().
get<
float>(
"max-wait-for-free-disk");
235 char hostname[_POSIX_HOST_NAME_MAX];
236 gethostname(hostname, _POSIX_HOST_NAME_MAX);
237 mHostName = hostname;
238 mHostName = mHostName.substr(0, mHostName.find(
'.'));
245 updateTimeDependentParams(pc);
246 mAcceptCurrentTF = triggerTF(pc);
247 if (mAcceptCurrentTF) {
248 prepareTFForWriting(pc);
254 if (mWriteTF && checkFreeSpace(pc)) {
256 size_t lTFSizeInFile = getTFSizeInFile();
260 mFile << lTFFileMeta;
261 mFile << mTFDataIndex;
263 for (
const auto& eqEntry : mDataMap) {
264 auto& [lSize, lCnt, lEntry] = eqEntry.second;
265 for (
size_t part = 0; part < lCnt; part++) {
266 const auto& dataPtr = mTFData[lEntry + part];
273 buffered_write(
reinterpret_cast<const char*
>(&hdToWrite),
sizeof(
DataHeader));
274 buffered_write(dataPtr.second, hdToWrite.
payloadSize);
280 }
catch (
const std::ios_base::failure& eFailExc) {
281 LOGP(error,
"Writing of TF {} to file {} failed. error={}", mTimingInfo.
tfCounter, mCurrentTFFileNameFullTmp, eFailExc.what());
287 mTFDataIndex.clear();
295 LOGP(info,
"Dumped {} TFs to {} files", mNTFsAccepted, mNTFFiles);
296 if (!mTriggerFilter.empty()) {
297 LOGP(info,
"External trigger summary: {}", reportRates());
302size_t RawTFDump::getTFSizeInFile()
const
308size_t RawTFDump::getCurrentFileSize()
310 return mFile.is_open() ? size_t(mFile.tellp()) : 0;
314void RawTFDump::prepareTFFile()
320 if (!mFile.is_open()) {
323 auto currSize = getCurrentFileSize();
324 if ((mNTFsInFile >= mMaxTFPerFile) ||
325 (currSize >= mMinFileSize) ||
326 (currSize && mMaxFileSize > mMinFileSize && ((currSize + mTFSize) > mMaxFileSize))) {
329 LOGP(info,
"Will add new TF of size {} to existing file of size {} with {} TFs", mTFSize, currSize, mNTFsInFile);
337 TFDir += fmt::format(
"{}_{}tf/", mDataTakingContext.
envId, mDataTakingContext.
runNumber);
338 if (!TFDir.empty()) {
340 LOGP(info,
"Created {} directory for TFs output", TFDir);
344 mCurrentTFFileNameFull = fmt::format(
"{}{}", TFDir, mCurrentTFFileName);
345 mCurrentTFFileNameFullTmp = TMPFileEnding.empty() ? mCurrentTFFileNameFull :
o2::utils::Str::concat_string(mCurrentTFFileNameFull, TMPFileEnding);
346 mFile.open(mCurrentTFFileNameFullTmp.c_str(), ios::binary | ios::trunc | ios::out | ios::ate);
347 LOGP(info,
"Opened new raw-tf dump file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
360 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.
runType, mDataTakingContext.
forcedRaw);
365void RawTFDump::closeTFFile()
367 if (!mFile.is_open()) {
371 LOGP(info,
"Closing output file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
374 if (mStoreMetaFile) {
376 if (!TFMetaData.
fillFileData(mCurrentTFFileNameFullTmp, mFillMD5, TMPFileEnding)) {
377 throw std::runtime_error(
"metadata file was requested but not created");
380 TFMetaData.
type = mMetaDataType;
382 TFMetaData.
tfOrbits.swap(mTFOrbits);
383 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mTFMetaFileDir, mCurrentTFFileName);
384 auto metaFileName = fmt::format(
"{}{}.done", mTFMetaFileDir, mCurrentTFFileName);
386 std::ofstream metaFileOut(metaFileNameTmp);
387 metaFileOut << TFMetaData;
389 if (!TMPFileEnding.empty()) {
390 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
392 std::filesystem::rename(metaFileNameTmp, metaFileName);
393 LOGP(info,
"wrote meta file {}", metaFileName);
394 }
catch (std::exception
const& e) {
395 LOGP(error,
"Failed to store TF meta data file {}, reason {}", metaFileName, e.what());
397 }
else if (!TMPFileEnding.empty()) {
398 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
400 }
catch (std::exception
const& e) {
401 LOGP(error,
"Failed to finalize TF file {}, reason: ", mCurrentTFFileNameFull, e.what());
410 int totalWait = 0, nwaitCycles = 0;
411 while (mCheckDiskFull) {
412 constexpr int showFirstN = 10, prsecaleWarnings = 50;
414 const auto si = std::filesystem::space(mCurrentTFFileNameFullTmp);
416 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
418 wmsg = fmt::format(
"Disk has {} MiB available while at least {} MiB is requested, wait for {} ms (on top of {} ms)", si.available / MiB,
size_t(mCheckDiskFull) / MiB, mWaitDiskFull, totalWait);
419 }
else if (mCheckDiskFull < 0.f &&
float(si.available) / si.capacity < -mCheckDiskFull) {
421 wmsg = fmt::format(
"Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ?
float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
426 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
428 LOGP(fatal,
"Disk has {} MiB available out of {} MiB after waiting for {} ms", si.available / MiB, si.capacity / MiB, mWaitDiskFullMax);
430 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
431 LOGP(alarm,
"{}", wmsg);
434 totalWait += mWaitDiskFull;
437 }
catch (std::exception
const& e) {
438 LOGP(fatal,
"unable to query disk space info for path {}, reason {}", mCurrentTFFileNameFull, e.what());
449 if (mTrigger.empty()) {
450 if (mMaxAccRate > 0.f) {
451 trig = (mUniformDist(mRGen) <= mMaxAccRate);
452 }
else if (mMaxAccRate < 0.f) {
453 trig = (mTimingInfo.
tfCounter %
int(std::ceil(-100.f / mMaxAccRate))) == 0;
457 auto const* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
459 LOGP(error,
"Failed to extract header for trigger input");
462 auto extTrig = DataRefUtils::as<bool>(
ref);
464 LOGP(info,
"trigger input {}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {} | span size: {} span[0]={}",
466 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, extTrig.size(), extTrig.size() > 0 ? extTrig[0] : false);
468 if (extTrig.size() && extTrig[0]) {
471 for (
const auto& excl : mExclTriggerFilter) {
489 mRateEstTrgLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsExtTrig)) / (2 * mNTFsSeen);
490 mRateEstTrgUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsExtTrig + 1)) / (2 * mNTFsSeen);
491 mRateEstAccLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsAccepted)) / (2 * mNTFsSeen);
492 mRateEstAccUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsAccepted + 1)) / (2 * mNTFsSeen);
493 if (mRateEstAccLow > 0.01 * mMaxAccRate) {
496 if ((mNTFsSeen - mLastWarned) > mWarnThrottleTF && ((mNWarnThrottle < mMaxWarnThrottle) || mMaxWarnThrottle < 0)) {
497 mLastWarned = mNTFsSeen;
498 std::string swarn = reportRates();
499 if (++mNWarnThrottle == mMaxWarnThrottle) {
500 swarn +=
" Will not warn anymore.";
502 swarn += fmt::format(
" Will suppress this warnings for {} TFs", mWarnThrottleTF);
504 LOGP(alarm,
"Ignoring TF triggered for dumping: {}", swarn);
513 LOGP(info,
"TF#{} (slice#{}) will{} be written, {}", mTimingInfo.
tfCounter, mTimingInfo.
timeslice, trig ?
"" :
" not", reportRates());
522 auto const* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
524 LOGP(error,
"Failed to extract header");
527 if ((dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) ||
534 const auto lHdrDataSize =
sizeof(
DataHeader) + dh->payloadSize;
535 mTFSize += lHdrDataSize;
537 auto& [lSize, lCnt, lEntry] = mDataMap[EquipmentIdentifier(*dh)];
539 lEntry = mTFData.size();
541 lSize += lHdrDataSize;
543 mTFData.push_back({
ref.header,
ref.payload});
545 const auto* dph = DataRefUtils::getHeader<DataProcessingHeader*>(
ref);
546 LOGP(info,
"{}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {}, creation: {} | counter:{} size:{} entry:{}",
548 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, dph ? dph->creation : -1UL, lCnt, lSize, lEntry);
554 LOGP(info,
"Creating dump image for TF {} of run {}, starting orbit {}, size = {}", mTimingInfo.
tfCounter, mTimingInfo.
runNumber, mTimingInfo.
firstTForbit, mTFSize);
555 std::uint64_t lCurrOff = 0;
556 for (
const auto& eqEntry : mDataMap) {
557 const auto& eq = eqEntry.first;
558 auto& [lSize, lCnt, lEntry] = eqEntry.second;
561 OutputSpec spec{eq.mDataOrigin, eq.mDataDescription, eq.mSubSpecification};
563 LOGP(info,
"{} : {} parts of size {} entry {}| offset: {}",
DataSpecUtils::describe(spec), lCnt, lSize, lEntry, lCurrOff);
565 mTFDataIndex.AddStfElement(eq, lCnt, lCurrOff, lSize);
572std::string RawTFDump::reportRates()
const
574 std::string rep = fmt::format(
"{} TFs seen, {} accepted", mNTFsSeen, mNTFsAccepted);
575 if (!mTrigger.empty()) {
576 rep += fmt::format(
", {} ext.triggered, est.rate: [{:.2e}:{:.2e}]/[{:.2e}:{:.2e}].", mNTFsExtTrig, mRateEstAccLow, mRateEstAccUpp, mRateEstTrgLow, mRateEstTrgUpp);
584 std::vector<InputSpec> inputs =
select(inpconfig.c_str());
591 {
"include-deadbeef", VariantType::Bool,
false, {
"Include DPL-generated 0xdeadbeef subspecs for missing data"}},
592 {
"include-dist-stf", VariantType::Bool,
false, {
"Include FLP/DISTSUBTIMEFRAME input"}},
593 {
"exclude-trigger-specs", VariantType::String,
"", {
"Ignore trigger seen in these inputs of triggerspec"}},
594 {
"max-dump-rate", VariantType::Float, 0.f, {
"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}},
595 {
"rate-est-conf-limit", VariantType::Float, 0.05f, {
"quantile for the lowest rate estimate confidence limit"}},
596 {
"max-warn", VariantType::Int, 5, {
"max allowed warnings on throttling"}},
597 {
"mute-warn-period", VariantType::Int, 100, {
"mute warnings on throttling for this number of TFs"}},
598 {
"output-dir", VariantType::String,
"none", {
"TF output directory, must exist"}},
599 {
"meta-output-dir", VariantType::String,
"/dev/null", {
"TF metadata output directory, must exist (if not /dev/null)"}},
600 {
"md5-for-meta", VariantType::Bool,
false, {
"fill CTF file MD5 sum in the metadata file"}},
601 {
"min-file-size", VariantType::Int64, 0l, {
"accumulate TFs until given file size reached"}},
602 {
"max-file-size", VariantType::Int64, 0l, {
"if > 0, try to avoid exceeding given file size, also used for space check"}},
603 {
"max-tf-per-file", VariantType::Int, 0, {
"if > 0, avoid storing more than requested CTFs per file"}},
604 {
"require-free-disk", VariantType::Float, 0.f, {
"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
605 {
"wait-for-free-disk", VariantType::Float, 10.f, {
"if paused due to the low disk space, recheck after this time (in s)"}},
606 {
"max-wait-for-free-disk", VariantType::Float, 60.f, {
"produce fatal if paused due to the low disk space for more than this amount in s."}},
607 {
"verbosity-level", VariantType::Int, 0, {
"Verbose mode: 1: decision on every TF, 2: details of saved TF, 3: more details"}},
608 {
"ignore-partition-run-dir", VariantType::Bool,
false, {
"Do not creare partition-run directory in output-dir"}}}};
Header of the AggregatedRunInfo struct.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static std::string getRawTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_rawtf_dump")
Static class with identifiers, bitmasks and names for ALICE detectors.
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr o2h::DataDescription DESCCRaw
RawTFDump(const std::string &trigger)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(InitContext &ic) final
static constexpr o2h::DataDescription DESCRaw
void run(ProcessingContext &pc) final
GLsizei const GLfloat * value
GLint GLint GLsizei GLint GLenum GLenum type
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Defining ITS Vertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
DataProcessorSpec getRawTFDumpSpec(const std::string &inpconfig, const std::string &trigger)
void createDirectoriesIfAbsent(std::string const &path)
static bool match(DataRef const &ref, const char *binding)
static std::string describe(InputSpec const &spec)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
bool globalRunNumberChanged
uint32_t tfCounter
the orbit the TF begins
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)