Project
Loading...
Searching...
No Matches
RawTFDumpSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17#include "Framework/Task.h"
23#include "RawTFDumpSpec.h"
29#include <unistd.h>
30#include <TMath.h>
31#include <filesystem>
32#include <random>
33#include <set>
34
35namespace o2::rawdd
36{
37namespace o2h = o2::header;
38using namespace o2::framework;
41using ios = std::ios_base;
42
43class RawTFDump : public Task
44{
45 public:
46 static constexpr o2h::DataDescription DESCRaw{"RAWDATA"}, DESCCRaw{"CRAWDATA"};
47
48 RawTFDump(const std::string& trigger);
49 void init(InitContext& ic) final;
50 void run(ProcessingContext& pc) final;
51 void endOfStream(EndOfStreamContext& ec) final;
52
53 private:
54 bool triggerTF(ProcessingContext& pc);
55 void updateTimeDependentParams(ProcessingContext& pc);
56 void prepareTFForWriting(ProcessingContext& pc);
57 size_t getTFSizeInFile() const;
58 size_t getCurrentFileSize();
59 void prepareTFFile();
60 void closeTFFile();
61 bool checkFreeSpace(ProcessingContext& pc);
62 std::string reportRates() const;
63
64 SubTimeFrameFileDataIndex mTFDataIndex;
65 std::vector<std::pair<size_t, const void*>> mTFData;
66 std::map<EquipmentIdentifier, std::tuple<size_t, size_t, size_t, const void*>> mDataMap;
67 std::vector<InputSpec> mFilter{};
68 std::vector<InputSpec> mTriggerFilter{};
69 std::vector<InputSpec> mExclTriggerFilter{};
70
71 size_t mTFSize = 0;
72 size_t mMinFileSize = 0; // if > 0, accumulate TFs in the same file until the total size exceeds this minimum
73 size_t mMaxFileSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinFileSize is not reached)
74
75 int mNTFsSeen = 0; // total number of TFs seen
76 int mNTFsExtTrig = 0; // total nunber of TFs externally triggered
77 int mNTFsAccepted = 0; // total number of TFs written
78 int mNTFsInFile = 0; // total number of TFs accumulated in the current file
79 int mNTFFiles = 0; // total number of TF files written
80 int mLastWarned = 0; // TF when last warned about throttling
81 int mMaxTFPerFile = 0; // max TFs per files to store
82 int mNWarnThrottle = 0; // number of times we warned about the throttling
83 int mMaxWarnThrottle = 0; // max allowed warnings about the throttling
84 int mWarnThrottleTF = 0; // min period (in TFs) between the warnings about the throttling
85 int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
86 int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
87 float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
88 float mMaxAccRate = 0.f; // max acceptance rate
89 float mConfLim = 0.05f; // confidence limit for rate esimate (lower quantile)
90 float mRateEstAccLow = 0.f; // lower limit on accepted TFs rate
91 float mRateEstAccUpp = 0.f; // upper limit on accepted TFs rate
92 float mRateEstTrgLow = 0.f; // lower limit on triggered TFs rate
93 float mRateEstTrgUpp = 0.f; // upper limit on triggered TFs rate
94
95 bool mFillMD5 = false;
96 bool mWriteTF = true; // for dry run
97 bool mStoreMetaFile = false;
98 bool mCreateRunEnvDir = true;
99 bool mAcceptCurrentTF = false;
100 bool mRejectDEADBEEF = false;
101 bool mRejectDistSTF = true;
102 int mVerbose = 0;
103 std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
104 o2::framework::DataTakingContext mDataTakingContext{};
105 o2::framework::TimingInfo mTimingInfo{};
106
107 std::string mTrigger{}; // external trigger input
108 std::string mExclTriggerSpecs{}; // trigger specs to ignore
109 std::string mHostName{};
110 std::string mTFDir{};
111 std::string mTFMetaFileDir = "/dev/null";
112 std::string mCurrentTFFileName{};
113 std::string mCurrentTFFileNameFull{};
114 std::string mCurrentTFFileNameFullTmp{};
115 std::string mMetaDataType{};
116
117 static constexpr size_t MiB = 1ul << 20;
118 static constexpr std::streamsize sBuffSize = MiB; // 1 MiB
119 static constexpr std::streamsize sChunkSize = 512;
120 static const std::string TMPFileEnding;
121 std::unique_ptr<char[]> mFileBuf;
122 std::ofstream mFile;
123 std::uniform_real_distribution<double> mUniformDist{0.0, 100.0};
124 std::default_random_engine mRGen;
125
126 // helper to make sure the written blocks are buffered
127 template <
128 typename pointer,
129 typename std::enable_if<
130 std::is_pointer<pointer>::value && // pointers only
131 (std::is_void<std::remove_pointer_t<pointer>>::value || // void* or standard layout!
132 std::is_standard_layout<std::remove_pointer_t<pointer>>::value)>::type* = nullptr>
133 void buffered_write(const pointer p, std::streamsize pCount)
134 {
135 // make sure we're not doing a short write
136 assert((pCount % sizeof(std::conditional_t<std::is_void<std::remove_pointer_t<pointer>>::value,
137 char, std::remove_pointer_t<pointer>>) ==
138 0) &&
139 "Performing short write?");
140
141 const char* lPtr = reinterpret_cast<const char*>(p);
142 // avoid the optimization if the write is large enough
143 if (pCount >= sBuffSize) {
144 mFile.write(lPtr, pCount);
145 } else {
146 // split the write to smaller chunks
147 while (pCount > 0) {
148 const auto lToWrite = std::min(pCount, sChunkSize);
149 assert(lToWrite > 0 && lToWrite <= sChunkSize && lToWrite <= pCount);
150
151 mFile.write(lPtr, lToWrite);
152 lPtr += lToWrite;
153 pCount -= lToWrite;
154 }
155 }
156 }
157};
158
159const std::string RawTFDump::TMPFileEnding{".part"};
160
161//________________________________________
162RawTFDump::RawTFDump(const std::string& trigger) : mTrigger{trigger}
163{
164 mTriggerFilter = select(trigger.c_str());
165 mFileBuf = std::make_unique<char[]>(sBuffSize);
166 mFile.rdbuf()->pubsetbuf(mFileBuf.get(), sBuffSize);
167 mFile.clear();
168 mFile.exceptions(std::fstream::failbit | std::fstream::badbit);
169}
170
171//________________________________________
173{
174 mRGen = std::default_random_engine(getpid());
175 mTFMetaFileDir = ic.options().get<std::string>("meta-output-dir");
176 if (mTFMetaFileDir != "/dev/null") {
177 mTFMetaFileDir = o2::utils::Str::rectifyDirectory(mTFMetaFileDir);
178 mStoreMetaFile = true;
179 mFillMD5 = ic.options().get<bool>("md5-for-meta");
180 }
181
182 mTFDir = ic.options().get<std::string>("output-dir");
183 if (mTFDir != "/dev/null") {
184 mTFDir = o2::utils::Str::rectifyDirectory(mTFDir);
185 mWriteTF = true;
186 } else {
187 mWriteTF = false;
188 mStoreMetaFile = false;
189 }
190 mRejectDistSTF = !ic.options().get<bool>("include-dist-stf");
191 mRejectDEADBEEF = !ic.options().get<bool>("include-deadbeef");
192 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
193 mMinFileSize = ic.options().get<int64_t>("min-file-size");
194 mMaxFileSize = ic.options().get<int64_t>("max-file-size");
195 mMaxTFPerFile = ic.options().get<int>("max-tf-per-file");
196 mMaxAccRate = ic.options().get<float>("max-dump-rate");
197 float cl = ic.options().get<float>("rate-est-conf-limit");
198 if (mConfLim < 0.001 || mConfLim > 0.32) {
199 LOGP(warn, "Bad confidence limit {} for rate estimate, setting to default {}", cl, mConfLim);
200 } else {
201 mConfLim = cl;
202 }
203 mMaxWarnThrottle = ic.options().get<int>("max-warn");
204 mWarnThrottleTF = ic.options().get<int>("mute-warn-period");
205
206 mVerbose = ic.options().get<int>("verbosity-level");
207 mExclTriggerSpecs = ic.options().get<std::string>("exclude-trigger-specs");
208 if (!mExclTriggerSpecs.empty()) {
209 mExclTriggerFilter = select(mExclTriggerSpecs.c_str());
210 }
211 if (mTrigger.empty()) {
212 if (mMaxAccRate >= 0.f) {
213 LOGP(info, "Will accept randomly {}% of TFs", mMaxAccRate);
214 } else {
215 LOGP(info, "Will accept every {}-th TF", int(std::ceil(-100.f / mMaxAccRate)));
216 }
217 } else {
218 mMaxAccRate = std::abs(mMaxAccRate);
219 LOGP(info, "Will limit TFs triggered with {} by {}% at most", mTrigger, mMaxAccRate);
220 if (!mExclTriggerFilter.empty()) {
221 LOGP(info, "Inputs excluded from the trigger: {}", mExclTriggerSpecs);
222 }
223 }
224
225 if (mWriteTF) {
226 if (mMinFileSize > 0) {
227 LOGP(info, "Multiple TFs will be accumulated in the file until its size exceeds {}{}",
228 mMinFileSize, mMaxFileSize > mMinFileSize ? fmt::format(" but does not exceed {} B", mMaxFileSize) : std::string{});
229 }
230 }
231
232 mCheckDiskFull = ic.options().get<float>("require-free-disk");
233 mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
234 mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
235
236 char hostname[_POSIX_HOST_NAME_MAX];
237 gethostname(hostname, _POSIX_HOST_NAME_MAX);
238 mHostName = hostname;
239 mHostName = mHostName.substr(0, mHostName.find('.'));
240}
241
242//________________________________________
244{
245 mNTFsSeen++;
246 updateTimeDependentParams(pc);
247 mAcceptCurrentTF = triggerTF(pc);
248 if (mAcceptCurrentTF) {
249 prepareTFForWriting(pc);
250 } else {
251 return;
252 }
253
254 prepareTFFile();
255 if (mWriteTF && checkFreeSpace(pc)) { // write data
256 try {
257 size_t lTFSizeInFile = getTFSizeInFile();
258 SubTimeFrameFileMeta lTFFileMeta(lTFSizeInFile);
259 lTFFileMeta.mWriteTimeMs = mTimingInfo.creation;
260
261 mFile << lTFFileMeta; // Write DataHeader + SubTimeFrameFileMeta
262 mFile << mTFDataIndex; // Write DataHeader + SubTimeFrameFileDataIndex
263
264 for (const auto& eqEntry : mDataMap) {
265 auto& [lSize, lCnt, lEntry, lHeader] = eqEntry.second;
266 for (size_t part = 0; part < lCnt; part++) {
267 const auto& dataPtr = mTFData[lEntry + part];
268 DataHeader hdToWrite = *reinterpret_cast<const DataHeader*>(lHeader); // make a local DataHeader copy to clear flagsNextHeader bit and set the parts correctly
269 hdToWrite.flagsNextHeader = 0;
270 hdToWrite.splitPayloadIndex = part;
271 hdToWrite.payloadSize = dataPtr.first;
272 if (mVerbose > 2) {
273 LOGP(info, "Writing part:{}/{} of {} | TFCounter:{} part{}/{}, size:{}", part, lCnt, DataSpecUtils::describe(OutputSpec{hdToWrite.dataOrigin, hdToWrite.dataDescription, hdToWrite.subSpecification}), hdToWrite.firstTForbit, hdToWrite.splitPayloadIndex, hdToWrite.splitPayloadParts, hdToWrite.payloadSize);
274 }
275 buffered_write(reinterpret_cast<const char*>(&hdToWrite), sizeof(DataHeader));
276 buffered_write(dataPtr.second, hdToWrite.payloadSize);
277 }
278 }
279 mFile.flush(); // flush the buffer and check the state
280 mTFOrbits.push_back(mTimingInfo.firstTForbit);
281 mNTFsInFile++;
282 } catch (const std::ios_base::failure& eFailExc) {
283 LOGP(error, "Writing of TF {} to file {} failed. error={}", mTimingInfo.tfCounter, mCurrentTFFileNameFullTmp, eFailExc.what());
284 }
285 }
286 // cleanup
287 mTFData.clear();
288 mDataMap.clear();
289 mTFDataIndex.clear();
290 mTFSize = 0;
291}
292
293//____________________________________________________________
295{
296 closeTFFile();
297 LOGP(info, "Dumped {} TFs to {} files", mNTFsAccepted, mNTFFiles);
298 if (!mTriggerFilter.empty()) {
299 LOGP(info, "External trigger summary: {}", reportRates());
300 }
301}
302
303//________________________________________
304size_t RawTFDump::getTFSizeInFile() const
305{
306 return SubTimeFrameFileMeta::getSizeInFile() + mTFDataIndex.getSizeInFile() + mTFSize;
307}
308
309//________________________________________
310size_t RawTFDump::getCurrentFileSize()
311{
312 return mFile.is_open() ? size_t(mFile.tellp()) : 0;
313}
314
315//___________________________________________________________________
316void RawTFDump::prepareTFFile()
317{
318 if (!mWriteTF) {
319 return;
320 }
321 bool needToOpen;
322 if (!mFile.is_open()) {
323 needToOpen = true;
324 } else {
325 auto currSize = getCurrentFileSize();
326 if ((mNTFsInFile >= mMaxTFPerFile) ||
327 (currSize >= mMinFileSize) || // min size exceeded, may close the file.
328 (currSize && mMaxFileSize > mMinFileSize && ((currSize + mTFSize) > mMaxFileSize))) { // this is not the 1st TF in the file and the new size will exceed allowed max
329 needToOpen = true;
330 } else {
331 LOGP(info, "Will add new TF of size {} to existing file of size {} with {} TFs", mTFSize, currSize, mNTFsInFile);
332 needToOpen = false;
333 }
334 }
335 if (needToOpen) {
336 closeTFFile();
337 auto TFDir = mTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mTFDir;
338 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
339 TFDir += fmt::format("{}_{}tf/", mDataTakingContext.envId, mDataTakingContext.runNumber);
340 if (!TFDir.empty()) {
342 LOGP(info, "Created {} directory for TFs output", TFDir);
343 }
344 }
345 mCurrentTFFileName = o2::base::NameConf::getRawTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter, mHostName);
346 mCurrentTFFileNameFull = fmt::format("{}{}", TFDir, mCurrentTFFileName);
347 mCurrentTFFileNameFullTmp = TMPFileEnding.empty() ? mCurrentTFFileNameFull : o2::utils::Str::concat_string(mCurrentTFFileNameFull, TMPFileEnding);
348 mFile.open(mCurrentTFFileNameFullTmp.c_str(), ios::binary | ios::trunc | ios::out | ios::ate);
349 LOGP(info, "Opened new raw-tf dump file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
350 mNTFFiles++;
351 }
352}
353
354//___________________________________________________________________
355void RawTFDump::updateTimeDependentParams(ProcessingContext& pc)
356{
357 namespace GRPECS = o2::parameters::GRPECS;
358 mTimingInfo = pc.services().get<o2::framework::TimingInfo>();
359 if (mTimingInfo.globalRunNumberChanged) {
360 mDataTakingContext = pc.services().get<DataTakingContext>();
361 // determine the output type for the TF metadata
362 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.runType, mDataTakingContext.forcedRaw);
363 }
364}
365
366//___________________________________________________________________
367void RawTFDump::closeTFFile()
368{
369 if (!mFile.is_open()) {
370 return;
371 }
372 try {
373 LOGP(info, "Closing output file {}[{}]", mCurrentTFFileNameFull, TMPFileEnding);
374 mFile.close();
375 // write TF file metaFile data
376 if (mStoreMetaFile) {
378 if (!TFMetaData.fillFileData(mCurrentTFFileNameFullTmp, mFillMD5, TMPFileEnding)) {
379 throw std::runtime_error("metadata file was requested but not created");
380 }
381 TFMetaData.setDataTakingContext(mDataTakingContext);
382 TFMetaData.type = mMetaDataType;
383 TFMetaData.priority = "high";
384 TFMetaData.tfOrbits.swap(mTFOrbits);
385 auto metaFileNameTmp = fmt::format("{}{}.tmp", mTFMetaFileDir, mCurrentTFFileName);
386 auto metaFileName = fmt::format("{}{}.done", mTFMetaFileDir, mCurrentTFFileName);
387 try {
388 std::ofstream metaFileOut(metaFileNameTmp);
389 metaFileOut << TFMetaData;
390 metaFileOut.close();
391 if (!TMPFileEnding.empty()) {
392 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
393 }
394 std::filesystem::rename(metaFileNameTmp, metaFileName);
395 LOGP(info, "wrote meta file {}", metaFileName);
396 } catch (std::exception const& e) {
397 LOGP(error, "Failed to store TF meta data file {}, reason {}", metaFileName, e.what());
398 }
399 } else if (!TMPFileEnding.empty()) {
400 std::filesystem::rename(mCurrentTFFileNameFullTmp, mCurrentTFFileNameFull);
401 }
402 } catch (std::exception const& e) {
403 LOGP(error, "Failed to finalize TF file {}, reason: ", mCurrentTFFileNameFull, e.what());
404 }
405 mTFOrbits.clear();
406 mNTFsInFile = 0;
407}
408
409//________________________________________
410bool RawTFDump::checkFreeSpace(ProcessingContext& pc)
411{
412 int totalWait = 0, nwaitCycles = 0;
413 while (mCheckDiskFull) {
414 constexpr int showFirstN = 10, prsecaleWarnings = 50;
415 try {
416 const auto si = std::filesystem::space(mCurrentTFFileNameFullTmp);
417 std::string wmsg{};
418 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
419 nwaitCycles++;
420 wmsg = fmt::format("Disk has {} MiB available while at least {} MiB is requested, wait for {} ms (on top of {} ms)", si.available / MiB, size_t(mCheckDiskFull) / MiB, mWaitDiskFull, totalWait);
421 } else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
422 nwaitCycles++;
423 wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
424 } else {
425 nwaitCycles = 0;
426 }
427 if (nwaitCycles) {
428 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
429 closeTFFile(); // try to save whatever we have
430 LOGP(fatal, "Disk has {} MiB available out of {} MiB after waiting for {} ms", si.available / MiB, si.capacity / MiB, mWaitDiskFullMax);
431 }
432 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
433 LOGP(alarm, "{}", wmsg);
434 }
435 pc.services().get<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
436 totalWait += mWaitDiskFull;
437 continue;
438 }
439 } catch (std::exception const& e) {
440 LOGP(fatal, "unable to query disk space info for path {}, reason {}", mCurrentTFFileNameFull, e.what()); // do we want this?
441 }
442 break;
443 }
444 return true;
445}
446
447//________________________________________
448bool RawTFDump::triggerTF(ProcessingContext& pc)
449{
450 bool trig = false;
451 if (mTrigger.empty()) { // random
452 if (mMaxAccRate > 0.f) {
453 trig = (mUniformDist(mRGen) <= mMaxAccRate);
454 } else if (mMaxAccRate < 0.f) {
455 trig = (mTimingInfo.tfCounter % int(std::ceil(-100.f / mMaxAccRate))) == 0;
456 }
457 } else {
458 for (auto const& ref : InputRecordWalker(pc.inputs(), mTriggerFilter)) {
459 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
460 if (!dh) {
461 LOGP(error, "Failed to extract header for trigger input");
462 continue;
463 }
464 auto extTrig = DataRefUtils::as<bool>(ref);
465 if (mVerbose > 1 || (mVerbose > 0 && extTrig.size() > 0 && extTrig[0])) {
466 LOGP(info, "trigger input {}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {} | span size: {} span[0]={}",
467 DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}),
468 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, extTrig.size(), extTrig.size() > 0 ? extTrig[0] : false);
469 }
470 if (extTrig.size() && extTrig[0]) {
471 // is the input with this trigger vetoed?
472 bool veto = false;
473 for (const auto& excl : mExclTriggerFilter) {
474 if (DataRefUtils::match(ref, excl)) {
475 if (mVerbose > 0) {
476 LOGP(info, "ignoring trigger from black-listed {}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}));
477 }
478 veto = true;
479 break;
480 }
481 }
482 if (veto) {
483 continue;
484 }
485 trig = true;
486 break;
487 }
488 }
489 if (trig) { // do we need to throttle?
490 mNTFsExtTrig++;
491 mRateEstTrgLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsExtTrig)) / (2 * mNTFsSeen);
492 mRateEstTrgUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsExtTrig + 1)) / (2 * mNTFsSeen);
493 mRateEstAccLow = TMath::ChisquareQuantile(mConfLim, 2 * (mNTFsAccepted)) / (2 * mNTFsSeen);
494 mRateEstAccUpp = TMath::ChisquareQuantile(1. - mConfLim, 2 * (mNTFsAccepted + 1)) / (2 * mNTFsSeen);
495 if (mRateEstAccLow > 0.01 * mMaxAccRate) { // current lowest estimate on the acceptance rate exceeds desired limit -> ignore trigger
496 trig = false;
497 // do we need to warn?
498 if ((mNTFsSeen - mLastWarned) > mWarnThrottleTF && ((mNWarnThrottle < mMaxWarnThrottle) || mMaxWarnThrottle < 0)) {
499 mLastWarned = mNTFsSeen;
500 std::string swarn = reportRates();
501 if (++mNWarnThrottle == mMaxWarnThrottle) {
502 swarn += " Will not warn anymore.";
503 } else {
504 swarn += fmt::format(" Will suppress this warnings for {} TFs", mWarnThrottleTF);
505 }
506 LOGP(alarm, "Ignoring TF triggered for dumping: {}", swarn);
507 }
508 }
509 }
510 }
511 if (trig) {
512 mNTFsAccepted++;
513 }
514 if (mVerbose > 0) {
515 LOGP(info, "TF#{} (slice#{}) will{} be written, {}", mTimingInfo.tfCounter, mTimingInfo.timeslice, trig ? "" : " not", reportRates());
516 }
517 return trig;
518}
519
520//________________________________________
521void RawTFDump::prepareTFForWriting(ProcessingContext& pc)
522{
523 for (auto const& ref : InputRecordWalker(pc.inputs(), mFilter)) {
524 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
525 if (!dh) {
526 LOGP(error, "Failed to extract header");
527 continue;
528 }
529 if ((dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) ||
530 (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF)) {
531 if (mVerbose > 2) {
532 LOGP(info, "Rejecting {}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}));
533 }
534 continue;
535 }
536 const auto payloadSize = DataRefUtils::getPayloadSize(ref);
537 const auto lHdrDataSize = sizeof(DataHeader) + payloadSize;
538 mTFSize += lHdrDataSize;
539
540 auto& [lSize, lCnt, lEntry, lHeader] = mDataMap[EquipmentIdentifier(*dh)];
541 if (!lCnt) {
542 lEntry = mTFData.size(); // flag where the data of this spec starts
543 lHeader = ref.header;
544 }
545 lSize += lHdrDataSize;
546 lCnt++;
547 mTFData.push_back({payloadSize, ref.payload});
548 if (mVerbose > 2) {
549 const auto* dph = DataRefUtils::getHeader<DataProcessingHeader*>(ref);
550 LOGP(info, "{}, part: {}({}) of {}, payload {}({}), 1stTFOrbit: {} TF: {}, creation: {} | counter:{} size:{} entry:{}",
551 DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}),
552 dh->splitPayloadIndex, lCnt - 1, dh->splitPayloadParts, dh->payloadSize, payloadSize, dh->firstTForbit, dh->tfCounter, dph ? dph->creation : -1UL, lCnt, lSize, lEntry);
553 // if (o2::raw::RDHUtils::checkRDH(ref.payload)) {
554 // o2::raw::RDHUtils::printRDH(ref.payload);
555 // }
556 }
557 }
558
559 // build the index
560 {
561 LOGP(info, "Creating dump image for TF {} of run {}, starting orbit {}, size = {}", mTimingInfo.tfCounter, mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTFSize);
562 std::uint64_t lCurrOff = 0;
563 for (const auto& eqEntry : mDataMap) {
564 const auto& eq = eqEntry.first;
565 auto& [lSize, lCnt, lEntry, lHeader] = eqEntry.second;
566 assert(lSize > sizeof(DataHeader));
567
568 OutputSpec spec{eq.mDataOrigin, eq.mDataDescription, eq.mSubSpecification};
569 if (mVerbose > 1) {
570 LOGP(info, "{} : {} parts of size {} entry {}| offset: {}", DataSpecUtils::describe(spec), lCnt, lSize, lEntry, lCurrOff);
571 }
572 mTFDataIndex.AddStfElement(eq, lCnt, lCurrOff, lSize);
573 lCurrOff += lSize;
574 }
575 }
576}
577
578//____________________________________________________________
579std::string RawTFDump::reportRates() const
580{
581 std::string rep = fmt::format("{} TFs seen, {} accepted", mNTFsSeen, mNTFsAccepted);
582 if (!mTrigger.empty()) {
583 rep += fmt::format(", {} ext.triggered, est.rate: [{:.2e}:{:.2e}]/[{:.2e}:{:.2e}].", mNTFsExtTrig, mRateEstAccLow, mRateEstAccUpp, mRateEstTrgLow, mRateEstTrgUpp);
584 }
585 return rep;
586}
587
588//__________________________________________________________
589DataProcessorSpec getRawTFDumpSpec(const std::string& inpconfig, const std::string& trigger)
590{
591 std::vector<InputSpec> inputs = select(inpconfig.c_str());
592 return DataProcessorSpec{
593 "raw-tf-dump",
594 inputs,
595 {},
596 AlgorithmSpec{adaptFromTask<RawTFDump>(trigger)},
597 Options{
598 {"include-deadbeef", VariantType::Bool, false, {"Include DPL-generated 0xdeadbeef subspecs for missing data"}},
599 {"include-dist-stf", VariantType::Bool, false, {"Include FLP/DISTSUBTIMEFRAME input"}},
600 {"exclude-trigger-specs", VariantType::String, "", {"Ignore trigger seen in these inputs of triggerspec"}},
601 {"max-dump-rate", VariantType::Float, 0.f, {"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}},
602 {"rate-est-conf-limit", VariantType::Float, 0.05f, {"quantile for the lowest rate estimate confidence limit"}},
603 {"max-warn", VariantType::Int, 5, {"max allowed warnings on throttling"}},
604 {"mute-warn-period", VariantType::Int, 100, {"mute warnings on throttling for this number of TFs"}},
605 {"output-dir", VariantType::String, "none", {"TF output directory, must exist"}},
606 {"meta-output-dir", VariantType::String, "/dev/null", {"TF metadata output directory, must exist (if not /dev/null)"}},
607 {"md5-for-meta", VariantType::Bool, false, {"fill CTF file MD5 sum in the metadata file"}},
608 {"min-file-size", VariantType::Int64, 0l, {"accumulate TFs until given file size reached"}},
609 {"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
610 {"max-tf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
611 {"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
612 {"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
613 {"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
614 {"verbosity-level", VariantType::Int, 0, {"Verbose mode: 1: decision on every TF, 2: details of saved TF, 3: more details"}},
615 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
616}
617
618} // namespace o2::rawdd
Header of the AggregatedRunInfo struct.
A helper class to iteratate over all parts of all input routes.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static std::string getRawTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_rawtf_dump")
Definition NameConf.cxx:98
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr o2h::DataDescription DESCCRaw
RawTFDump(const std::string &trigger)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(InitContext &ic) final
static constexpr o2h::DataDescription DESCRaw
void run(ProcessingContext &pc) final
GLenum void ** pointer
Definition glcorearb.h:805
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
o2::header::DataHeader DataHeader
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
O2 data header classes and API, v0.1.
Definition DetID.h:49
DataProcessorSpec getRawTFDumpSpec(const std::string &inpconfig, const std::string &trigger)
std::ios_base ios
void createDirectoriesIfAbsent(std::string const &path)
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
std::vector< uint32_t > tfOrbits
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool match(DataRef const &ref, const char *binding)
static std::string describe(InputSpec const &spec)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
TForbitType firstTForbit
Definition DataHeader.h:676
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
PayloadSizeType payloadSize
Definition DataHeader.h:668
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
static constexpr std::uint64_t getSizeInFile()
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)