Project
Loading...
Searching...
No Matches
RawTFDumpSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17#include "Framework/Task.h"
22#include "RawTFDumpSpec.h"
28#include <unistd.h>
29#include <TMath.h>
30#include <filesystem>
31#include <random>
32#include <set>
33
34namespace o2::rawdd
35{
36namespace o2h = o2::header;
37using namespace o2::framework;
40using ios = std::ios_base;
41
42class RawTFDump : public Task
43{
44 public:
45 static constexpr o2h::DataDescription DESCRaw{"RAWDATA"}, DESCCRaw{"CRAWDATA"};
46
47 RawTFDump(const std::string& trigger);
48 void init(InitContext& ic) final;
49 void run(ProcessingContext& pc) final;
50 void endOfStream(EndOfStreamContext& ec) final;
51
52 private:
53 bool triggerTF(ProcessingContext& pc);
54 void updateTimeDependentParams(ProcessingContext& pc);
55 void prepareTFForWriting(ProcessingContext& pc);
56 size_t getTFSizeInFile() const;
57 size_t getCurrentFileSize();
58 void prepareTFFile();
59 void closeTFFile();
60 bool checkFreeSpace(ProcessingContext& pc);
61 std::string reportRates() const;
62
63 SubTimeFrameFileDataIndex mTFDataIndex;
64 std::vector<std::pair<const void*, const void*>> mTFData;
65 std::map<EquipmentIdentifier, std::tuple<size_t, size_t, size_t>> mDataMap;
66 std::vector<InputSpec> mFilter{};
67 std::vector<InputSpec> mTriggerFilter{};
68 std::vector<InputSpec> mExclTriggerFilter{};
69
70 size_t mTFSize = 0;
71 size_t mMinFileSize = 0; // if > 0, accumulate TFs in the same file until the total size exceeds this minimum
72 size_t mMaxFileSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinFileSize is not reached)
73
74 int mNTFsSeen = 0; // total number of TFs seen
75 int mNTFsExtTrig = 0; // total nunber of TFs externally triggered
76 int mNTFsAccepted = 0; // total number of TFs written
77 int mNTFsInFile = 0; // total number of TFs accumulated in the current file
78 int mNTFFiles = 0; // total number of TF files written
79 int mLastWarned = 0; // TF when last warned about throttling
80 int mMaxTFPerFile = 0; // max TFs per files to store
81 int mNWarnThrottle = 0; // number of times we warned about the throttling
82 int mMaxWarnThrottle = 0; // max allowed warnings about the throttling
83 int mWarnThrottleTF = 0; // min period (in TFs) between the warnings about the throttling
84 int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
85 int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
86 float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
87 float mMaxAccRate = 0.f; // max acceptance rate
88 float mConfLim = 0.05f; // confidence limit for rate esimate (lower quantile)
89 float mRateEstAccLow = 0.f; // lower limit on accepted TFs rate
90 float mRateEstAccUpp = 0.f; // upper limit on accepted TFs rate
91 float mRateEstTrgLow = 0.f; // lower limit on triggered TFs rate
92 float mRateEstTrgUpp = 0.f; // upper limit on triggered TFs rate
93
94 bool mFillMD5 = false;
95 bool mWriteTF = true; // for dry run
96 bool mStoreMetaFile = false;
97 bool mCreateRunEnvDir = true;
98 bool mAcceptCurrentTF = false;
99 bool mRejectDEADBEEF = false;
100 bool mRejectDistSTF = true;
101 int mVerbose = 0;
102 std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
103 o2::framework::DataTakingContext mDataTakingContext{};
104 o2::framework::TimingInfo mTimingInfo{};
105
106 std::string mTrigger{}; // external trigger input
107 std::string mExclTriggerSpecs{}; // trigger specs to ignore
108 std::string mHostName{};
109 std::string mTFDir{};
110 std::string mTFMetaFileDir = "/dev/null";
111 std::string mCurrentTFFileName{};
112 std::string mCurrentTFFileNameFull{};
113 std::string mCurrentTFFileNameFullTmp{};
114 std::string mMetaDataType{};
115
116 static constexpr size_t MiB = 1ul << 20;
117 static constexpr std::streamsize sBuffSize = MiB; // 1 MiB
118 static constexpr std::streamsize sChunkSize = 512;
119 static const std::string TMPFileEnding;
120 std::unique_ptr<char[]> mFileBuf;
121 std::ofstream mFile;
122 std::uniform_real_distribution<double> mUniformDist{0.0, 100.0};
123 std::default_random_engine mRGen;
124
125 // helper to make sure the written blocks are buffered
126 template <
127 typename pointer,
128 typename std::enable_if<
129 std::is_pointer<pointer>::value && // pointers only
130 (std::is_void<std::remove_pointer_t<pointer>>::value || // void* or standard layout!
131 std::is_standard_layout<std::remove_pointer_t<pointer>>::value)>::type* = nullptr>
132 void buffered_write(const pointer p, std::streamsize pCount)
133 {
134 // make sure we're not doing a short write
135 assert((pCount % sizeof(std::conditional_t<std::is_void<std::remove_pointer_t<pointer>>::value,
136 char, std::remove_pointer_t<pointer>>) ==
137 0) &&
138 "Performing short write?");
139
140 const char* lPtr = reinterpret_cast<const char*>(p);
141 // avoid the optimization if the write is large enough
142 if (pCount >= sBuffSize) {
143 mFile.write(lPtr, pCount);
144 } else {
145 // split the write to smaller chunks
146 while (pCount > 0) {
147 const auto lToWrite = std::min(pCount, sChunkSize);
148 assert(lToWrite > 0 && lToWrite <= sChunkSize && lToWrite <= pCount);
149
150 mFile.write(lPtr, lToWrite);
151 lPtr += lToWrite;
152 pCount -= lToWrite;
153 }
154 }
155 }
156};
157
158const std::string RawTFDump::TMPFileEnding{".part"};
159
160//________________________________________
161RawTFDump::RawTFDump(const std::string& trigger) : mTrigger{trigger}
162{
163 mTriggerFilter = select(trigger.c_str());
164 mFileBuf = std::make_unique<char[]>(sBuffSize);
165 mFile.rdbuf()->pubsetbuf(mFileBuf.get(), sBuffSize);
166 mFile.clear();
167 mFile.exceptions(std::fstream::failbit | std::fstream::badbit);
168}
169
170//________________________________________
172{
173 mRGen = std::default_random_engine(getpid());
174 mTFMetaFileDir = ic.options().get<std::string>("meta-output-dir");
175 if (mTFMetaFileDir != "/dev/null") {
176 mTFMetaFileDir = o2::utils::Str::rectifyDirectory(mTFMetaFileDir);
177 mStoreMetaFile = true;
178 mFillMD5 = ic.options().get<bool>("md5-for-meta");
179 }
180
181 mTFDir = ic.options().get<std::string>("output-dir");
182 if (mTFDir != "/dev/null") {
183 mTFDir = o2::utils::Str::rectifyDirectory(mTFDir);
184 mWriteTF = true;
185 } else {
186 mWriteTF = false;
187 mStoreMetaFile = false;
188 }
189 mRejectDistSTF = !ic.options().get<bool>("include-dist-stf");
190 mRejectDEADBEEF = !ic.options().get<bool>("include-deadbeef");
191 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
192 mMinFileSize = ic.options().get<int64_t>("min-file-size");
193 mMaxFileSize = ic.options().get<int64_t>("max-file-size");
194 mMaxTFPerFile = ic.options().get<int>("max-tf-per-file");
195 mMaxAccRate = ic.options().get<float>("max-dump-rate");
196 float cl = ic.options().get<float>("rate-est-conf-limit");
197 if (mConfLim < 0.001 || mConfLim > 0.32) {
198 LOGP(warn, "Bad confidence limit {} for rate estimate, setting to default {}", cl, mConfLim);
199 } else {
200 mConfLim = cl;
201 }
202 mMaxWarnThrottle = ic.options().get<int>("max-warn");
203 mWarnThrottleTF = ic.options().get<int>("mute-warn-period");
204
205 mVerbose = ic.options().get<int>("verbosity-level");
206 mExclTriggerSpecs = ic.options().get<std::string>("exclude-trigger-specs");
207 if (!mExclTriggerSpecs.empty()) {
208 mExclTriggerFilter = select(mExclTriggerSpecs.c_str());
209 }
210 if (mTrigger.empty()) {
211 if (mMaxAccRate >= 0.f) {
212 LOGP(info, "Will accept randomly {}% of TFs", mMaxAccRate);
213 } else {
214 LOGP(info, "Will accept every {}-th TF", int(std::ceil(-100.f / mMaxAccRate)));
215 }
216 } else {
217 mMaxAccRate = std::abs(mMaxAccRate);
218 LOGP(info, "Will limit TFs triggered with {} by {}% at most", mTrigger, mMaxAccRate);
219 if (!mExclTriggerFilter.empty()) {
220 LOGP(info, "Inputs excluded from the trigger: {}", mExclTriggerSpecs);
221 }
222 }
223
224 if (mWriteTF) {
225 if (mMinFileSize > 0) {
226 LOGP(info, "Multiple TFs will be accumulated in the file until its size exceeds {}{}",
227 mMinFileSize, mMaxFileSize > mMinFileSize ? fmt::format(" but does not exceed {} B", mMaxFileSize) : std::string{});
228 }
229 }
230
231 mCheckDiskFull = ic.options().get<float>("require-free-disk");
232 mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
233 mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
234
235 char hostname[_POSIX_HOST_NAME_MAX];
236 gethostname(hostname, _POSIX_HOST_NAME_MAX);
237 mHostName = hostname;
238 mHostName = mHostName.substr(0, mHostName.find('.'));
239}
240
241//________________________________________
243{
244 mNTFsSeen++;
245 updateTimeDependentParams(pc);
246 mAcceptCurrentTF = triggerTF(pc);
247 if (mAcceptCurrentTF) {
248 prepareTFForWriting(pc);
249 } else {
250 return;
251 }
252
253 prepareTFFile();
254 if (mWriteTF && checkFreeSpace(pc)) { // write data
255 try {
256 size_t lTFSizeInFile = getTFSizeInFile();
257 SubTimeFrameFileMeta lTFFileMeta(lTFSizeInFile);
258 lTFFileMeta.mWriteTimeMs = mTimingInfo.creation;
259
260 mFile << lTFFileMeta; // Write DataHeader + SubTimeFrameFileMeta
261 mFile << mTFDataIndex; // Write DataHeader + SubTimeFrameFileDataIndex
262
263 for (const auto& eqEntry : mDataMap) {
264 auto& [lSize, lCnt, lEntry] = eqEntry.second;
265 for (size_t part = 0; part < lCnt; part++) {
266 const auto& dataPtr = mTFData[lEntry + part];
267 DataHeader hdToWrite = *reinterpret_cast<const DataHeader*>(dataPtr.first); // make a local DataHeader copy to clear flagsNextHeader bit
268 hdToWrite.flagsNextHeader = 0;
269 hdToWrite.splitPayloadIndex = part;
270 if (mVerbose > 2) {
271 LOGP(info, "Writing part:{}/{} of {} | TFCounter:{} part{}/{}", part, lCnt, DataSpecUtils::describe(OutputSpec{hdToWrite.dataOrigin, hdToWrite.dataDescription, hdToWrite.subSpecification}), hdToWrite.firstTForbit, hdToWrite.splitPayloadIndex, hdToWrite.splitPayloadParts);
272 }
273 buffered_write(reinterpret_cast<const char*>(&hdToWrite), sizeof(DataHeader));
274 buffered_write(dataPtr.second, hdToWrite.payloadSize);
275 }
276 }
277 mFile.flush(); // flush the buffer and check the state
278 mTFOrbits.push_back(mTimingInfo.firstTForbit);
279 mNTFsInFile++;
280 } catch (const std::ios_base::failure& eFailExc) {
281 LOGP(error, "Writing of TF {} to file {} failed. error={}", mTimingInfo.tfCounter, mCurrentTFFileNameFullTmp, eFailExc.what());
282 }
283 }
284 // cleanup
285 mTFData.clear();
286 mDataMap.clear();
287 mTFDataIndex.clear();
288 mTFSize = 0;
289}
290
291//____________________________________________________________
293{
294 closeTFFile();
295 LOGP(info, "Dumped {} TFs to {} files", mNTFsAccepted, mNTFFiles);
296 if (!mTriggerFilter.empty()) {
297 LOGP(info, "External trigger summary: {}", reportRates());
298 }
299}
300
301//________________________________________
302size_t RawTFDump::getTFSizeInFile() const
303{
304 return SubTimeFrameFileMeta::getSizeInFile() + mTFDataIndex.getSizeInFile() + mTFSize;
305}
306
307//________________________________________
308size_t RawTFDump::getCurrentFileSize()
309{
310 return mFile.is_open() ? size_t(mFile.tellp()) : 0;
311}
312
313//___________________________________________________________________
314void RawTFDump::prepareTFFile()
315{
316 if (!mWriteTF) {
317 return;
318 }
319 bool needToOpen;
320 if (!mFile.is_open()) {
321 needToOpen = true;
322 } else {
323 auto currSize = getCurrentFileSize();
324 if ((mNTFsInFile >= mMaxTFPerFile) ||
325 (currSize >= mMinFileSize) || // min size exceeded, may close the file.
326 (currSize && mMaxFileSize > mMinFileSize && ((currSize + mTFSize) > mMaxFileSize))) { // this is not the 1st TF in the file and the new size will exceed allowed max
327 needToOpen = true;
328 } else {
329 LOGP(info, "Will add new TF of size {} to existing file of size {} with {} TFs", mTFSize, currSize, mNTFsInFile);
330 needToOpen = false;
331 }
332 }
333 if (needToOpen) {
334 closeTFFile();
335 auto TFDir = mTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mTFDir;
336 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
337 TFDir += fmt::format("{}_{}tf/", mDataTakingContext.envId, mDataTakingContext.runNumber);
338 if (!TFDir.empty()) {
340 LOGP(info, "Created {} directory for TFs output", TFDir);
341 }
342 }
343 mCurrentTFFileName = o2::base::NameConf::getRawTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter, mHostName);
344 mCurrentTFFileNameFull = fmt::format("{}{}", TFDir, mCurrentTFFileName);
345 mCurrentTFFileNameFullTmp = TMPFileEnding.empty() ? mCurrentTFFileNameFull : o2::utils::Str::concat_string(mCurrentTFFileNameFull, TMPFileEnding);
346 mFile.open(mCurrentTFFileNameFullTmp.c_str(), ios::binary | ios::trunc | ios::out | ios::ate);
347 LOGP(info, "Opened new raw-tf dump file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
348 mNTFFiles++;
349 }
350}
351
352//___________________________________________________________________
353void RawTFDump::updateTimeDependentParams(ProcessingContext& pc)
354{
355 namespace GRPECS = o2::parameters::GRPECS;
356 mTimingInfo = pc.services().get<o2::framework::TimingInfo>();
357 if (mTimingInfo.globalRunNumberChanged) {
358 mDataTakingContext = pc.services().get<DataTakingContext>();
359 // determine the output type for the TF metadata
360 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.runType, mDataTakingContext.forcedRaw);
361 }
362}
363
364//___________________________________________________________________
365void RawTFDump::closeTFFile()
366{
367 if (!mFile.is_open()) {
368 return;
369 }
370 try {
371 LOGP(info, "Closing output file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
372 mFile.close();
373 // write TF file metaFile data
374 if (mStoreMetaFile) {
376 if (!TFMetaData.fillFileData(mCurrentTFFileNameFullTmp, mFillMD5, TMPFileEnding)) {
377 throw std::runtime_error("metadata file was requested but not created");
378 }
379 TFMetaData.setDataTakingContext(mDataTakingContext);
380 TFMetaData.type = mMetaDataType;
381 TFMetaData.priority = "high";
382 TFMetaData.tfOrbits.swap(mTFOrbits);
383 auto metaFileNameTmp = fmt::format("{}{}.tmp", mTFMetaFileDir, mCurrentTFFileName);
384 auto metaFileName = fmt::format("{}{}.done", mTFMetaFileDir, mCurrentTFFileName);
385 try {
386 std::ofstream metaFileOut(metaFileNameTmp);
387 metaFileOut << TFMetaData;
388 metaFileOut.close();
389 if (!TMPFileEnding.empty()) {
390 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
391 }
392 std::filesystem::rename(metaFileNameTmp, metaFileName);
393 LOGP(info, "wrote meta file {}", metaFileName);
394 } catch (std::exception const& e) {
395 LOGP(error, "Failed to store TF meta data file {}, reason {}", metaFileName, e.what());
396 }
397 } else if (!TMPFileEnding.empty()) {
398 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
399 }
400 } catch (std::exception const& e) {
401 LOGP(error, "Failed to finalize TF file {}, reason: ", mCurrentTFFileNameFull, e.what());
402 }
403 mTFOrbits.clear();
404 mNTFsInFile = 0;
405}
406
407//________________________________________
408bool RawTFDump::checkFreeSpace(ProcessingContext& pc)
409{
410 int totalWait = 0, nwaitCycles = 0;
411 while (mCheckDiskFull) {
412 constexpr int showFirstN = 10, prsecaleWarnings = 50;
413 try {
414 const auto si = std::filesystem::space(mCurrentTFFileNameFullTmp);
415 std::string wmsg{};
416 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
417 nwaitCycles++;
418 wmsg = fmt::format("Disk has {} MiB available while at least {} MiB is requested, wait for {} ms (on top of {} ms)", si.available / MiB, size_t(mCheckDiskFull) / MiB, mWaitDiskFull, totalWait);
419 } else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
420 nwaitCycles++;
421 wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
422 } else {
423 nwaitCycles = 0;
424 }
425 if (nwaitCycles) {
426 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
427 closeTFFile(); // try to save whatever we have
428 LOGP(fatal, "Disk has {} MiB available out of {} MiB after waiting for {} ms", si.available / MiB, si.capacity / MiB, mWaitDiskFullMax);
429 }
430 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
431 LOGP(alarm, "{}", wmsg);
432 }
433 pc.services().get<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
434 totalWait += mWaitDiskFull;
435 continue;
436 }
437 } catch (std::exception const& e) {
438 LOGP(fatal, "unable to query disk space info for path {}, reason {}", mCurrentTFFileNameFull, e.what()); // do we want this?
439 }
440 break;
441 }
442 return true;
443}
444
445//________________________________________
446bool RawTFDump::triggerTF(ProcessingContext& pc)
447{
448 bool trig = false;
449 if (mTrigger.empty()) { // random
450 if (mMaxAccRate > 0.f) {
451 trig = (mUniformDist(mRGen) <= mMaxAccRate);
452 } else if (mMaxAccRate < 0.f) {
453 trig = (mTimingInfo.tfCounter % int(std::ceil(-100.f / mMaxAccRate))) == 0;
454 }
455 } else {
456 for (auto const& ref : InputRecordWalker(pc.inputs(), mTriggerFilter)) {
457 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
458 if (!dh) {
459 LOGP(error, "Failed to extract header for trigger input");
460 continue;
461 }
462 auto extTrig = DataRefUtils::as<bool>(ref);
463 if (mVerbose > 0) {
464 LOGP(info, "trigger input {}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {} | span size: {} span[0]={}",
465 DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}),
466 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, extTrig.size(), extTrig.size() > 0 ? extTrig[0] : false);
467 }
468 if (extTrig.size() && extTrig[0]) {
469 // is the input with this trigger vetoed?
470 bool veto = false;
471 for (const auto& excl : mExclTriggerFilter) {
472 if (DataRefUtils::match(ref, excl)) {
473 if (mVerbose > 0) {
474 LOGP(info, "ignoring trigger from black-listed {}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}));
475 }
476 veto = true;
477 break;
478 }
479 }
480 if (veto) {
481 continue;
482 }
483 trig = true;
484 break;
485 }
486 }
487 if (trig) { // do we need to throttle?
488 mNTFsExtTrig++;
489 mRateEstTrgLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsExtTrig)) / (2 * mNTFsSeen);
490 mRateEstTrgUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsExtTrig + 1)) / (2 * mNTFsSeen);
491 mRateEstAccLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsAccepted)) / (2 * mNTFsSeen);
492 mRateEstAccUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsAccepted + 1)) / (2 * mNTFsSeen);
493 if (mRateEstAccLow > 0.01 * mMaxAccRate) { // current lowest estimate on the acceptance rate exceeds desired limit -> ignore trigger
494 trig = false;
495 // do we need to warn?
496 if ((mNTFsSeen - mLastWarned) > mWarnThrottleTF && ((mNWarnThrottle < mMaxWarnThrottle) || mMaxWarnThrottle < 0)) {
497 mLastWarned = mNTFsSeen;
498 std::string swarn = reportRates();
499 if (++mNWarnThrottle == mMaxWarnThrottle) {
500 swarn += " Will not warn anymore.";
501 } else {
502 swarn += fmt::format(" Will suppress this warnings for {} TFs", mWarnThrottleTF);
503 }
504 LOGP(alarm, "Ignoring TF triggered for dumping: {}", swarn);
505 }
506 }
507 }
508 }
509 if (trig) {
510 mNTFsAccepted++;
511 }
512 if (mVerbose > 0) {
513 LOGP(info, "TF#{} (slice#{}) will{} be written, {}", mTimingInfo.tfCounter, mTimingInfo.timeslice, trig ? "" : " not", reportRates());
514 }
515 return trig;
516}
517
518//________________________________________
519void RawTFDump::prepareTFForWriting(ProcessingContext& pc)
520{
521 for (auto const& ref : InputRecordWalker(pc.inputs(), mFilter)) {
522 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
523 if (!dh) {
524 LOGP(error, "Failed to extract header");
525 continue;
526 }
527 if ((dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) ||
528 (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF)) {
529 if (mVerbose > 2) {
530 LOGP(info, "Rejecting {}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}));
531 }
532 continue;
533 }
534 const auto lHdrDataSize = sizeof(DataHeader) + dh->payloadSize;
535 mTFSize += lHdrDataSize;
536
537 auto& [lSize, lCnt, lEntry] = mDataMap[EquipmentIdentifier(*dh)];
538 if (!lCnt) {
539 lEntry = mTFData.size(); // flag where the data of this spec starts
540 }
541 lSize += lHdrDataSize;
542 lCnt++;
543 mTFData.push_back({ref.header, ref.payload});
544 if (mVerbose > 2) {
545 const auto* dph = DataRefUtils::getHeader<DataProcessingHeader*>(ref);
546 LOGP(info, "{}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {}, creation: {} | counter:{} size:{} entry:{}",
547 DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}),
548 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, dph ? dph->creation : -1UL, lCnt, lSize, lEntry);
549 }
550 }
551
552 // build the index
553 {
554 LOGP(info, "Creating dump image for TF {} of run {}, starting orbit {}, size = {}", mTimingInfo.tfCounter, mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTFSize);
555 std::uint64_t lCurrOff = 0;
556 for (const auto& eqEntry : mDataMap) {
557 const auto& eq = eqEntry.first;
558 auto& [lSize, lCnt, lEntry] = eqEntry.second;
559 assert(lSize > sizeof(DataHeader));
560
561 OutputSpec spec{eq.mDataOrigin, eq.mDataDescription, eq.mSubSpecification};
562 if (mVerbose > 1) {
563 LOGP(info, "{} : {} parts of size {} entry {}| offset: {}", DataSpecUtils::describe(spec), lCnt, lSize, lEntry, lCurrOff);
564 }
565 mTFDataIndex.AddStfElement(eq, lCnt, lCurrOff, lSize);
566 lCurrOff += lSize;
567 }
568 }
569}
570
571//____________________________________________________________
572std::string RawTFDump::reportRates() const
573{
574 std::string rep = fmt::format("{} TFs seen, {} accepted", mNTFsSeen, mNTFsAccepted);
575 if (!mTrigger.empty()) {
576 rep += fmt::format(", {} ext.triggered, est.rate: [{:.2e}:{:.2e}]/[{:.2e}:{:.2e}].", mNTFsExtTrig, mRateEstAccLow, mRateEstAccUpp, mRateEstTrgLow, mRateEstTrgUpp);
577 }
578 return rep;
579}
580
581//__________________________________________________________
582DataProcessorSpec getRawTFDumpSpec(const std::string& inpconfig, const std::string& trigger)
583{
584 std::vector<InputSpec> inputs = select(inpconfig.c_str());
585 return DataProcessorSpec{
586 "raw-tf-dump",
587 inputs,
588 {},
589 AlgorithmSpec{adaptFromTask<RawTFDump>(trigger)},
590 Options{
591 {"include-deadbeef", VariantType::Bool, false, {"Include DPL-generated 0xdeadbeef subspecs for missing data"}},
592 {"include-dist-stf", VariantType::Bool, false, {"Include FLP/DISTSUBTIMEFRAME input"}},
593 {"exclude-trigger-specs", VariantType::String, "", {"Ignore trigger seen in these inputs of triggerspec"}},
594 {"max-dump-rate", VariantType::Float, 0.f, {"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}},
595 {"rate-est-conf-limit", VariantType::Float, 0.05f, {"quantile for the lowest rate estimate confidence limit"}},
596 {"max-warn", VariantType::Int, 5, {"max allowed warnings on throttling"}},
597 {"mute-warn-period", VariantType::Int, 100, {"mute warnings on throttling for this number of TFs"}},
598 {"output-dir", VariantType::String, "none", {"TF output directory, must exist"}},
599 {"meta-output-dir", VariantType::String, "/dev/null", {"TF metadata output directory, must exist (if not /dev/null)"}},
600 {"md5-for-meta", VariantType::Bool, false, {"fill CTF file MD5 sum in the metadata file"}},
601 {"min-file-size", VariantType::Int64, 0l, {"accumulate TFs until given file size reached"}},
602 {"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
603 {"max-tf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
604 {"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
605 {"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
606 {"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
607 {"verbosity-level", VariantType::Int, 0, {"Verbose mode: 1: decision on every TF, 2: details of saved TF, 3: more details"}},
608 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
609}
610
611} // namespace o2::rawdd
Header of the AggregatedRunInfo struct.
A helper class to iteratate over all parts of all input routes.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static std::string getRawTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_rawtf_dump")
Definition NameConf.cxx:98
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr o2h::DataDescription DESCCRaw
RawTFDump(const std::string &trigger)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(InitContext &ic) final
static constexpr o2h::DataDescription DESCRaw
void run(ProcessingContext &pc) final
GLenum void ** pointer
Definition glcorearb.h:805
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
o2::header::DataHeader DataHeader
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
O2 data header classes and API, v0.1.
Definition DetID.h:49
DataProcessorSpec getRawTFDumpSpec(const std::string &inpconfig, const std::string &trigger)
std::ios_base ios
void createDirectoriesIfAbsent(std::string const &path)
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
std::vector< uint32_t > tfOrbits
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
static bool match(DataRef const &ref, const char *binding)
static std::string describe(InputSpec const &spec)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
TForbitType firstTForbit
Definition DataHeader.h:676
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
PayloadSizeType payloadSize
Definition DataHeader.h:668
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
static constexpr std::uint64_t getSizeInFile()
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)