Project
Loading...
Searching...
No Matches
CTFWriterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include "Framework/Logger.h"
16#include "Framework/InputSpec.h"
17#include "Framework/Task.h"
21#include <fairmq/Device.h>
22
33#include "DataFormatsTPC/CTF.h"
34#include "DataFormatsTRD/CTF.h"
35#include "DataFormatsHMP/CTF.h"
36#include "DataFormatsFT0/CTF.h"
37#include "DataFormatsFV0/CTF.h"
38#include "DataFormatsFDD/CTF.h"
39#include "DataFormatsTOF/CTF.h"
40#include "DataFormatsMID/CTF.h"
41#include "DataFormatsMCH/CTF.h"
43#include "DataFormatsPHOS/CTF.h"
44#include "DataFormatsCPV/CTF.h"
45#include "DataFormatsZDC/CTF.h"
46#include "DataFormatsCTP/CTF.h"
47
48#include "rANS/histogram.h"
49#include "rANS/compat.h"
50
51#include <vector>
52#include <stdexcept>
53#include <array>
54#include <TStopwatch.h>
55#include <vector>
56#include <TFile.h>
57#include <TTree.h>
58#include <TRandom.h>
59#include <filesystem>
60#include <ctime>
61#include <sys/stat.h>
62#include <fcntl.h>
63#include <unistd.h>
64#include <regex>
65#include <numeric>
66
67using namespace o2::framework;
68
69namespace o2
70{
71namespace ctf
72{
73
74template <typename T>
75size_t appendToTree(TTree& tree, const std::string brname, T& ptr)
76{
77 size_t s = 0;
78 auto* br = tree.GetBranch(brname.c_str());
79 auto* pptr = &ptr;
80 if (br) {
81 br->SetAddress(&pptr);
82 } else {
83 br = tree.Branch(brname.c_str(), &pptr);
84 }
85 int res = br->Fill();
86 if (res < 0) {
87 throw std::runtime_error(fmt::format("Failed to fill CTF branch {}", brname));
88 }
89 s += res;
90 br->ResetAddress();
91 return s;
92}
93
96
98{
99 public:
100 CTFWriterSpec() = delete;
102 ~CTFWriterSpec() final { finalize(); }
103 void init(o2::framework::InitContext& ic) final;
105 void endOfStream(o2::framework::EndOfStreamContext& ec) final { finalize(); }
106 void stop() final { finalize(); }
107 bool isPresent(DetID id) const { return mInput.detMask[id]; }
108
109 static std::string getBinding(const std::string& name, int spec) { return fmt::format("{}_{}", name, spec); }
110
111 private:
112 void updateTimeDependentParams(ProcessingContext& pc);
113 template <typename C>
114 size_t processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree);
115 template <typename C>
116 void storeDictionary(DetID det, CTFHeader& header);
117 void storeDictionaries();
118 void closeTFTreeAndFile();
119 void prepareTFTreeAndFile();
120 size_t estimateCTFSize(ProcessingContext& pc);
121 size_t getAvailableDiskSpace(const std::string& path, int level);
122 void createLockFile(int level);
123 void removeLockFile();
124 void finalize();
125
126 CTFWriterInp mInput;
127 bool mFinalized = false;
128 bool mWriteCTF = true;
129 bool mCreateDict = false;
130 bool mCreateRunEnvDir = true;
131 bool mStoreMetaFile = false;
132 bool mRejectCurrentTF = false;
133 bool mFallBackDirUsed = false;
134 bool mFallBackDirProvided = false;
135 int mSaveDictAfter = 0; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
136 uint32_t mPrevDictTimeStamp = 0; // timestamp of the previously stored dictionary
137 uint32_t mDictTimeStamp = 0; // timestamp of the currently stored dictionary
138 size_t mMinSize = 0; // if > 0, accumulate CTFs in the same tree until the total size exceeds this minimum
139 size_t mMaxSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinSize is not reached)
140 size_t mChkSize = 0; // if > 0 and fallback storage provided, reserve this size per CTF file in production on primary storage
141 size_t mAccCTFSize = 0; // so far accumulated size (if any)
142 size_t mCurrCTFSize = 0; // size of currently processed CTF
143 size_t mNCTF = 0; // total number of CTFs written
144 size_t mNCTFPrevDict = 0; // total number of CTFs used for previous dictionary version
145 size_t mNAccCTF = 0; // total number of CTFs accumulated in the current file
146 int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
147 int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
148 float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
149 long mCTFAutoSave = 0; // if > 0, autosave after so many TFs
150 size_t mNCTFFiles = 0; // total number of CTF files written
151 int mMaxCTFPerFile = 0; // max CTFs per files to store
152 int mRejRate = 0; // CTF rejection rule (>0: percentage to reject randomly, <0: reject if timeslice%|value|!=0)
153 int mCTFFileCompression = 0; // CTF file compression level (if >= 0)
154 bool mFillMD5 = false;
155 std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
156 o2::framework::DataTakingContext mDataTakingContext{};
157 o2::framework::TimingInfo mTimingInfo{};
158 std::string mDictDir{};
159 std::string mCTFDir{};
160 std::string mHostName{};
161 std::string mCTFDirFallBack = "/dev/null";
162 std::string mCTFMetaFileDir = "/dev/null";
163 std::string mCurrentCTFFileName{};
164 std::string mCurrentCTFFileNameFull{};
165 std::string mSizeReport{};
166 std::string mMetaDataType{};
167 const std::string LOCKFileDir = "/tmp/ctf-writer-locks";
168 std::string mLockFileName{};
169 int mLockFD = -1;
170 std::unique_ptr<TFile> mCTFFileOut;
171 std::unique_ptr<TTree> mCTFTreeOut;
172
173 std::unique_ptr<TFile> mDictFileOut; // file to store dictionary
174 std::unique_ptr<TTree> mDictTreeOut; // tree to store dictionary
175
176 // For the external dictionary creation we accumulate for each detector the frequency tables of its each block
177 // After accumulation over multiple TFs we store the dictionaries data in the standard CTF format of this detector,
178 // i.e. EncodedBlock stored in a tree, BUT with dictionary data only added to each block.
179 // The metadata of the block (min,max) will be used for the consistency check at the decoding
180 std::array<std::vector<FTrans>, DetID::nDetectors> mFreqsAccumulation;
181 std::array<std::vector<o2::ctf::Metadata>, DetID::nDetectors> mFreqsMetaData;
182 std::array<std::bitset<64>, DetID::nDetectors> mIsSaturatedFrequencyTable;
183 std::array<std::shared_ptr<void>, DetID::nDetectors> mHeaders;
184 TStopwatch mTimer;
185
186 static const std::string TMPFileEnding;
187};
188
189const std::string CTFWriterSpec::TMPFileEnding{".part"};
190
191//___________________________________________________________________
193 : mInput(inp)
194{
195 std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](auto& bitset) { bitset.reset(); });
196 mTimer.Stop();
197 mTimer.Reset();
198}
199
200//___________________________________________________________________
202{
203 // auto outmode = ic.options().get<std::string>("output-type"); // RS FIXME once global/local options clash is solved, --output-type will become device option
204 auto outmode = mInput.outType;
205 if (outmode == "ctf") {
206 mWriteCTF = true;
207 mCreateDict = false;
208 } else if (outmode == "dict") {
209 mWriteCTF = false;
210 mCreateDict = true;
211 } else if (outmode == "both") {
212 mWriteCTF = true;
213 mCreateDict = true;
214 } else if (outmode == "none") {
215 mWriteCTF = false;
216 mCreateDict = false;
217 } else {
218 throw std::invalid_argument("Invalid output-type");
219 }
220
221 mSaveDictAfter = ic.options().get<int>("save-dict-after");
222 mCTFAutoSave = ic.options().get<long>("save-ctf-after");
223 mCTFFileCompression = ic.options().get<int>("ctf-file-compression");
224 mCTFMetaFileDir = ic.options().get<std::string>("meta-output-dir");
225 if (mCTFMetaFileDir != "/dev/null") {
226 mCTFMetaFileDir = o2::utils::Str::rectifyDirectory(mCTFMetaFileDir);
227 mStoreMetaFile = true;
228 mFillMD5 = ic.options().get<bool>("md5-for-meta");
229 }
230 mDictDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("ctf-dict-dir"));
231 mCTFDir = ic.options().get<std::string>("output-dir");
232 if (mCTFDir != "/dev/null") {
233 mCTFDir = o2::utils::Str::rectifyDirectory(mCTFDir);
234 } else {
235 mWriteCTF = false;
236 mStoreMetaFile = false;
237 }
238 mCTFDirFallBack = ic.options().get<std::string>("output-dir-alt");
239 if (mCTFDirFallBack != "/dev/null") {
240 mCTFDirFallBack = o2::utils::Str::rectifyDirectory(mCTFDirFallBack);
241 mFallBackDirProvided = true;
242 }
243 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
244 mMinSize = ic.options().get<int64_t>("min-file-size");
245 mMaxSize = ic.options().get<int64_t>("max-file-size");
246 mMaxCTFPerFile = ic.options().get<int>("max-ctf-per-file");
247 mRejRate = ic.options().get<int>("ctf-rejection");
248 if (mRejRate > 0) {
249 LOGP(info, "Will reject{} {}% of TFs", mRejRate < 100 ? " randomly" : "", mRejRate < 100 ? mRejRate : 100);
250 } else if (mRejRate < -1) {
251 LOGP(info, "Will reject all but each {}-th TF slice", -mRejRate);
252 }
253
254 if (mWriteCTF) {
255 if (mMinSize > 0) {
256 LOG(info) << "Multiple CTFs will be accumulated in the tree/file until its size exceeds " << mMinSize << " bytes";
257 if (mMaxSize > mMinSize) {
258 LOG(info) << "but does not exceed " << mMaxSize << " bytes";
259 }
260 }
261 }
262
263 mCheckDiskFull = ic.options().get<float>("require-free-disk");
264 mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
265 mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
266
267 mChkSize = std::max(size_t(mMinSize * 1.1), mMaxSize);
269
270 if (mCreateDict) { // make sure that there is no local dictonary
271 std::string dictFileName = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
272 if (std::filesystem::exists(dictFileName)) {
273 throw std::runtime_error(o2::utils::Str::concat_string("CTF dictionary creation is requested but ", dictFileName, " already exists, remove it!"));
274 }
276 }
277
278 char hostname[_POSIX_HOST_NAME_MAX];
279 gethostname(hostname, _POSIX_HOST_NAME_MAX);
280 mHostName = hostname;
281 mHostName = mHostName.substr(0, mHostName.find('.'));
282}
283
284//___________________________________________________________________
285void CTFWriterSpec::updateTimeDependentParams(ProcessingContext& pc)
286{
287 namespace GRPECS = o2::parameters::GRPECS;
288 mTimingInfo = pc.services().get<o2::framework::TimingInfo>();
289 if (mTimingInfo.globalRunNumberChanged) {
290 mDataTakingContext = pc.services().get<DataTakingContext>();
291 // determine the output type for the CTF metadata
292 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.runType, mDataTakingContext.forcedRaw);
293 }
294}
295
296//___________________________________________________________________
297// process data of particular detector
298template <typename C>
299size_t CTFWriterSpec::processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree)
300{
301 static bool warnedEmpty = false;
302 size_t sz = 0;
303
304 if (!isPresent(det) || !pc.inputs().isValid(getBinding(det.getName(), 0))) {
305 mSizeReport += fmt::format(" {}:N/A", det.getName());
306 return sz;
307 }
308
309 uint32_t nLayers = 1;
310 if (det == DetID::ITS) {
312 } else if (det == DetID::MFT) {
314 }
315 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
316 auto binding = getBinding(det.getName(), iLayer);
317 auto ctfBuffer = pc.inputs().get<gsl::span<o2::ctf::BufferType>>(binding);
318 const o2::ctf::BufferType* bdata = ctfBuffer.data();
319 if (bdata) {
320 if (warnedEmpty) {
321 throw std::runtime_error(fmt::format("Non-empty input was seen at {}-th TF after empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
322 }
323 const auto ctfImage = C::getImage(bdata);
325 if (mWriteCTF && !mRejectCurrentTF) {
326 sz += ctfImage.appendToTree(*tree, nLayers > 1 ? binding : det.getName());
327 header.detectors.set(det);
328 } else {
329 sz += ctfBuffer.size();
330 }
331 if (mCreateDict) { // RSTODO
332 if (mFreqsAccumulation[det].empty()) {
333 mFreqsAccumulation[det].resize(C::getNBlocks());
334 mFreqsMetaData[det].resize(C::getNBlocks());
335 }
336 if (!mHeaders[det]) { // store 1st header
337 mHeaders[det] = ctfImage.cloneHeader();
338 auto& hb = *static_cast<o2::ctf::CTFDictHeader*>(mHeaders[det].get());
339 hb.det = det;
340 }
341 for (int ib = 0; ib < C::getNBlocks(); ib++) {
342 if (!mIsSaturatedFrequencyTable[det][ib]) {
343 const auto& bl = ctfImage.getBlock(ib);
344 if (bl.getNDict()) {
345 auto freq = mFreqsAccumulation[det][ib];
346 auto& mdSave = mFreqsMetaData[det][ib];
347 const auto& md = ctfImage.getMetadata(ib);
348 if ([&, this]() {
349 try {
350 freq.addFrequencies(bl.getDict(), bl.getDict() + bl.getNDict(), md.min);
351 } catch (const std::overflow_error& e) {
352 LOGP(warning, "unable to add frequency table for {}, block {} due to overflow", det.getName(), ib);
353 mIsSaturatedFrequencyTable[det][ib] = true;
354 return false;
355 }
356 return true;
357 }()) {
359 auto histogramView = o2::rans::trim(o2::rans::makeHistogramView(freq));
360 mdSave = ctf::detail::makeMetadataRansDict(newProbBits,
361 static_cast<int32_t>(histogramView.getMin()),
362 static_cast<int32_t>(histogramView.getMax()),
363 static_cast<int32_t>(histogramView.size()),
364 md.opt);
365 mFreqsAccumulation[det][ib] = std::move(freq);
366 }
367 }
368 }
369 }
370 }
371 } else {
372 if (!warnedEmpty) {
373 if (mNCTF) {
374 throw std::runtime_error(fmt::format("Empty input was seen at {}-th TF after non-empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
375 }
376 LOGP(important, "Empty CTF provided for {}, skipping and will not report anymore", det.getName());
377 warnedEmpty = true;
378 }
379 }
380 }
381 mSizeReport += fmt::format(" {}:{}", det.getName(), fmt::group_digits(sz));
382 return sz;
383}
384
385//___________________________________________________________________
386// store dictionary of a particular detector
387template <typename C>
388void CTFWriterSpec::storeDictionary(DetID det, CTFHeader& header)
389{
390 // create vector whose data contains dictionary in CTF format (EncodedBlock)
391 if (!isPresent(det) || !mFreqsAccumulation[det].size()) {
392 return;
393 }
394 auto dictBlocks = C::createDictionaryBlocks(mFreqsAccumulation[det], mFreqsMetaData[det]);
395 auto& h = C::get(dictBlocks.data())->getHeader();
396 h = *reinterpret_cast<typename std::remove_reference<decltype(h)>::type*>(mHeaders[det].get());
397 auto& hb = static_cast<o2::ctf::CTFDictHeader&>(h);
398 hb = *static_cast<const o2::ctf::CTFDictHeader*>(mHeaders[det].get());
399 hb.dictTimeStamp = mDictTimeStamp;
400
401 auto getFileName = [this, det, &hb](bool curr) {
402 return fmt::format("{}{}_{}_v{}.{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, det.getName(), int(hb.majorVersion), int(hb.minorVersion),
403 curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
404 };
405
406 C::get(dictBlocks.data())->print(o2::utils::Str::concat_string("Storing dictionary for ", det.getName(), ": "));
407 auto outName = getFileName(true);
408 TFile flout(outName.c_str(), "recreate");
409 flout.WriteObject(&dictBlocks, o2::base::NameConf::CCDBOBJECT.data());
410 flout.WriteObject(&hb, fmt::format("ctf_dict_header_{}", det.getName()).c_str());
411 flout.Close();
412 LOGP(info, "Saved {} with {} TFs to {}", hb.asString(), mNCTF, outName);
413 if (mPrevDictTimeStamp) {
414 auto outNamePrev = getFileName(false);
415 if (std::filesystem::exists(outNamePrev)) {
416 std::filesystem::remove(outNamePrev);
417 LOGP(info, "Removed previous dictionary version {}", outNamePrev);
418 }
419 }
420 C::get(dictBlocks.data())->appendToTree(*mDictTreeOut.get(), det.getName()); // cast to EncodedBlock and attach to dictionaries tree
421 header.detectors.set(det);
422}
423
424//___________________________________________________________________
425size_t CTFWriterSpec::estimateCTFSize(ProcessingContext& pc)
426{
427 size_t s = 0;
428 for (auto id = DetID::First; id <= DetID::Last; id++) {
429 DetID det(id);
430 uint32_t nLayers = 1;
431 if (det == DetID::ITS) {
433 } else if (det == DetID::MFT) {
435 }
436 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
437 auto binding = getBinding(det.getName(), iLayer);
438 if (!isPresent(det) || !pc.inputs().isValid(binding)) {
439 continue;
440 }
441 s += pc.inputs().get<gsl::span<o2::ctf::BufferType>>(binding).size();
442 }
443 }
444 return s;
445}
446
447//___________________________________________________________________
449{
450 const std::string NAStr = "NA";
452 mTimer.Reset();
453 }
454 auto cput = mTimer.CpuTime();
455 mTimer.Start(false);
456 updateTimeDependentParams(pc);
457 mRejectCurrentTF = (mRejRate > 0 && int(gRandom->Rndm() * 100) < mRejRate) || (mRejRate < -1 && mTimingInfo.timeslice % (-mRejRate));
458 mCurrCTFSize = estimateCTFSize(pc);
459 if (mWriteCTF && !mRejectCurrentTF) {
460 prepareTFTreeAndFile();
461
462 int totalWait = 0, nwaitCycles = 0;
463 while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) { // we are on the physical disk and not on the RAM disk
464 constexpr size_t MB = 1024 * 1024;
465 constexpr int showFirstN = 10, prsecaleWarnings = 50;
466 try {
467 const auto si = std::filesystem::space(mCTFFileOut->GetName());
468 std::string wmsg{};
469 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
470 nwaitCycles++;
471 wmsg = fmt::format("Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB, size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
472 } else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
473 nwaitCycles++;
474 wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
475 } else {
476 nwaitCycles = 0;
477 }
478 if (nwaitCycles) {
479 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
480 closeTFTreeAndFile(); // try to save whatever we have
481 LOGP(fatal, "Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
482 }
483 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
484 LOG(alarm) << wmsg;
485 }
486 pc.services().get<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
487 totalWait += mWaitDiskFull;
488 continue;
489 }
490 } catch (std::exception const& e) {
491 LOG(fatal) << "unable to query disk space info for path " << mCurrentCTFFileNameFull << ", reason: " << e.what();
492 }
493 break;
494 }
495 }
496 // create header
497 CTFHeader header{mTimingInfo.runNumber, mTimingInfo.creation, mTimingInfo.firstTForbit, mTimingInfo.tfCounter};
498 size_t szCTF = 0;
499 mSizeReport = "";
500 std::array<size_t, DetID::CTP + 1> szCTFperDet{0}; // DetID::TST is between FDD and CTP and remains empty
501 szCTFperDet[DetID::ITS] = processDet<o2::itsmft::CTF>(pc, DetID::ITS, header, mCTFTreeOut.get());
502 szCTFperDet[DetID::TPC] = processDet<o2::tpc::CTF>(pc, DetID::TPC, header, mCTFTreeOut.get());
503 szCTFperDet[DetID::TRD] = processDet<o2::trd::CTF>(pc, DetID::TRD, header, mCTFTreeOut.get());
504 szCTFperDet[DetID::TOF] = processDet<o2::tof::CTF>(pc, DetID::TOF, header, mCTFTreeOut.get());
505 szCTFperDet[DetID::PHS] = processDet<o2::phos::CTF>(pc, DetID::PHS, header, mCTFTreeOut.get());
506 szCTFperDet[DetID::CPV] = processDet<o2::cpv::CTF>(pc, DetID::CPV, header, mCTFTreeOut.get());
507 szCTFperDet[DetID::EMC] = processDet<o2::emcal::CTF>(pc, DetID::EMC, header, mCTFTreeOut.get());
508 szCTFperDet[DetID::HMP] = processDet<o2::hmpid::CTF>(pc, DetID::HMP, header, mCTFTreeOut.get());
509 szCTFperDet[DetID::MFT] = processDet<o2::itsmft::CTF>(pc, DetID::MFT, header, mCTFTreeOut.get());
510 szCTFperDet[DetID::MCH] = processDet<o2::mch::CTF>(pc, DetID::MCH, header, mCTFTreeOut.get());
511 szCTFperDet[DetID::MID] = processDet<o2::mid::CTF>(pc, DetID::MID, header, mCTFTreeOut.get());
512 szCTFperDet[DetID::ZDC] = processDet<o2::zdc::CTF>(pc, DetID::ZDC, header, mCTFTreeOut.get());
513 szCTFperDet[DetID::FT0] = processDet<o2::ft0::CTF>(pc, DetID::FT0, header, mCTFTreeOut.get());
514 szCTFperDet[DetID::FV0] = processDet<o2::fv0::CTF>(pc, DetID::FV0, header, mCTFTreeOut.get());
515 szCTFperDet[DetID::FDD] = processDet<o2::fdd::CTF>(pc, DetID::FDD, header, mCTFTreeOut.get());
516 szCTFperDet[DetID::CTP] = processDet<o2::ctp::CTF>(pc, DetID::CTP, header, mCTFTreeOut.get());
517 szCTF = std::accumulate(szCTFperDet.begin(), szCTFperDet.end(), 0);
518 if (mInput.reportInterval > 0 && (mTimingInfo.tfCounter % mInput.reportInterval) == 0) {
519 LOGP(important, "CTF {} size report:{} - Total:{}", mTimingInfo.tfCounter, mSizeReport, fmt::group_digits(szCTF));
520 }
521
522 mTimer.Stop();
523
524 if (mWriteCTF && !mRejectCurrentTF) {
525 szCTF += appendToTree(*mCTFTreeOut.get(), "CTFHeader", header);
526 size_t prevSizeMB = mAccCTFSize / (1 << 20);
527 mAccCTFSize += szCTF;
528 mCTFTreeOut->SetEntries(++mNAccCTF);
529 mTFOrbits.push_back(mTimingInfo.firstTForbit);
530 LOG(info) << "TF#" << mNCTF << ": wrote CTF{" << header << "} of size " << szCTF << " to " << mCurrentCTFFileNameFull << " in " << mTimer.CpuTime() - cput << " s";
531 if (mNAccCTF > 1) {
532 LOG(info) << "Current CTF tree has " << mNAccCTF << " entries with total size of " << mAccCTFSize << " bytes";
533 }
534 if (mLockFD != -1) {
535 lseek(mLockFD, 0, SEEK_SET);
536 auto nwr = write(mLockFD, &mAccCTFSize, sizeof(size_t));
537 if (nwr != sizeof(size_t)) {
538 LOG(error) << "Failed to write current CTF size " << mAccCTFSize << " to lock file, bytes written: " << nwr;
539 }
540 }
541
542 if (mAccCTFSize >= mMinSize || (mMaxCTFPerFile > 0 && mNAccCTF >= mMaxCTFPerFile)) {
543 closeTFTreeAndFile();
544 } else if ((mCTFAutoSave > 0 && mNAccCTF % mCTFAutoSave == 0) || (mCTFAutoSave < 0 && int(prevSizeMB / (-mCTFAutoSave)) != size_t(mAccCTFSize / (1 << 20)) / (-mCTFAutoSave))) {
545 mCTFTreeOut->AutoSave("override");
546 }
547 } else {
548 LOG(info) << "TF#" << mNCTF << " {" << header << "} CTF writing is disabled, size was " << szCTF << " bytes";
549 }
550
551 mNCTF++;
552 if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
553 storeDictionaries();
554 }
555 int dummy = 0;
556 pc.outputs().snapshot({"ctfdone", 0}, dummy);
557 pc.outputs().snapshot(Output{"CTF", "SIZES", 0}, szCTFperDet);
558}
559
560//___________________________________________________________________
561void CTFWriterSpec::finalize()
562{
563 if (mFinalized) {
564 return;
565 }
566 if (mCreateDict) {
567 storeDictionaries();
568 }
569 if (mWriteCTF) {
570 closeTFTreeAndFile();
571 }
572 LOGF(info, "CTF writing total timing: Cpu: %.3e Real: %.3e s in %d slots",
573 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
574 mFinalized = true;
575 mNCTF = 0;
576 mNCTFFiles = 0;
577}
578
579//___________________________________________________________________
580void CTFWriterSpec::prepareTFTreeAndFile()
581{
582 if (!mWriteCTF) {
583 return;
584 }
585 bool needToOpen = false;
586 if (!mCTFTreeOut) {
587 needToOpen = true;
588 } else {
589 if ((mAccCTFSize >= mMinSize) || // min size exceeded, may close the file.
590 (mAccCTFSize && mMaxSize > mMinSize && ((mAccCTFSize + mCurrCTFSize) > mMaxSize))) { // this is not the 1st CTF in the file and the new size will exceed allowed max
591 needToOpen = true;
592 } else {
593 LOGP(info, "Will add new CTF of estimated size {} to existing file of size {}", mCurrCTFSize, mAccCTFSize);
594 }
595 }
596 if (needToOpen) {
597 closeTFTreeAndFile();
598 mFallBackDirUsed = false;
599 auto ctfDir = mCTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mCTFDir;
600 if (mChkSize > 0 && mFallBackDirProvided) {
601 createLockFile(0);
602 auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
603 if (sz < mChkSize) {
604 removeLockFile();
605 LOG(warning) << "Primary CTF output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
606 ctfDir = mCTFDirFallBack;
607 mFallBackDirUsed = true;
608 }
609 }
610 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
611 ctfDir += fmt::format("{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
612 if (!ctfDir.empty()) {
614 LOGP(info, "Created {} directory for CTFs output", ctfDir);
615 }
616 }
617 mCurrentCTFFileName = o2::base::NameConf::getCTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter, mHostName);
618 mCurrentCTFFileNameFull = fmt::format("{}{}", ctfDir, mCurrentCTFFileName);
619 mCTFFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
620 if (mCTFFileCompression >= 0) {
621 mCTFFileOut->SetCompressionLevel(mCTFFileCompression);
622 }
623 mCTFTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFTREENAME).c_str(), "O2 CTF tree");
624
625 mNCTFFiles++;
626 }
627}
628
629//___________________________________________________________________
630void CTFWriterSpec::closeTFTreeAndFile()
631{
632 if (mCTFTreeOut) {
633 try {
634 mCTFFileOut->cd();
635 mCTFTreeOut->Write();
636 mCTFTreeOut.reset();
637 mCTFFileOut->Close();
638 mCTFFileOut.reset();
639 // write CTF file metaFile data
640 auto actualFileName = TMPFileEnding.empty() ? mCurrentCTFFileNameFull : o2::utils::Str::concat_string(mCurrentCTFFileNameFull, TMPFileEnding);
641 if (mStoreMetaFile) {
643 if (!ctfMetaData.fillFileData(actualFileName, mFillMD5, TMPFileEnding)) {
644 throw std::runtime_error("metadata file was requested but not created");
645 }
646 ctfMetaData.setDataTakingContext(mDataTakingContext);
647 ctfMetaData.type = mMetaDataType;
648 ctfMetaData.priority = mFallBackDirUsed ? "low" : "high";
649 ctfMetaData.tfOrbits.swap(mTFOrbits);
650 auto metaFileNameTmp = fmt::format("{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
651 auto metaFileName = fmt::format("{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
652 try {
653 std::ofstream metaFileOut(metaFileNameTmp);
654 metaFileOut << ctfMetaData;
655 metaFileOut.close();
656 if (!TMPFileEnding.empty()) {
657 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
658 }
659 std::filesystem::rename(metaFileNameTmp, metaFileName);
660 } catch (std::exception const& e) {
661 LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
662 }
663 } else if (!TMPFileEnding.empty()) {
664 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
665 }
666 } catch (std::exception const& e) {
667 LOG(error) << "Failed to finalize CTF file " << mCurrentCTFFileNameFull << ", reason: " << e.what();
668 }
669 mTFOrbits.clear();
670 mNAccCTF = 0;
671 mAccCTFSize = 0;
672 removeLockFile();
673 }
674}
675
676//___________________________________________________________________
677void CTFWriterSpec::storeDictionaries()
678{
679 // monolitic dictionary in tree format
680 mDictTimeStamp = uint32_t(std::time(nullptr));
681 auto getFileName = [this](bool curr) {
682 return fmt::format("{}{}Tree_{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, DetID::getNames(this->mInput.detMask, '-'), curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
683 };
684 auto dictFileName = getFileName(true);
685 mDictFileOut.reset(TFile::Open(dictFileName.c_str(), "recreate"));
686 mDictTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFDICT).c_str(), "O2 CTF dictionary");
687
688 CTFHeader header{mTimingInfo.runNumber, uint32_t(mNCTF)};
689 storeDictionary<o2::itsmft::CTF>(DetID::ITS, header);
690 storeDictionary<o2::itsmft::CTF>(DetID::MFT, header);
691 storeDictionary<o2::tpc::CTF>(DetID::TPC, header);
692 storeDictionary<o2::trd::CTF>(DetID::TRD, header);
693 storeDictionary<o2::tof::CTF>(DetID::TOF, header);
694 storeDictionary<o2::ft0::CTF>(DetID::FT0, header);
695 storeDictionary<o2::fv0::CTF>(DetID::FV0, header);
696 storeDictionary<o2::fdd::CTF>(DetID::FDD, header);
697 storeDictionary<o2::mid::CTF>(DetID::MID, header);
698 storeDictionary<o2::mch::CTF>(DetID::MCH, header);
699 storeDictionary<o2::emcal::CTF>(DetID::EMC, header);
700 storeDictionary<o2::phos::CTF>(DetID::PHS, header);
701 storeDictionary<o2::cpv::CTF>(DetID::CPV, header);
702 storeDictionary<o2::zdc::CTF>(DetID::ZDC, header);
703 storeDictionary<o2::hmpid::CTF>(DetID::HMP, header);
704 storeDictionary<o2::ctp::CTF>(DetID::CTP, header);
705 mDictFileOut->cd();
706 appendToTree(*mDictTreeOut.get(), "CTFHeader", header);
707 mDictTreeOut->SetEntries(1);
708 mDictTreeOut->Write(mDictTreeOut->GetName(), TObject::kSingleKey);
709 mDictTreeOut.reset();
710 mDictFileOut.reset();
711 std::string dictFileNameLnk = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
712 if (std::filesystem::exists(dictFileNameLnk)) {
713 std::filesystem::remove(dictFileNameLnk);
714 }
715 std::filesystem::create_symlink(dictFileName, dictFileNameLnk);
716 LOGP(info, "Saved CTF dictionaries tree with {} TFs to {} and linked to {}", mNCTF, dictFileName, dictFileNameLnk);
717 if (mPrevDictTimeStamp) {
718 auto dictFileNamePrev = getFileName(false);
719 if (std::filesystem::exists(dictFileNamePrev)) {
720 std::filesystem::remove(dictFileNamePrev);
721 LOGP(info, "Removed previous dictionary version {}", dictFileNamePrev);
722 }
723 }
724 mNCTFPrevDict = mNCTF;
725 mPrevDictTimeStamp = mDictTimeStamp;
726}
727
728//___________________________________________________________________
729void CTFWriterSpec::createLockFile(int level)
730{
731 // create lock file for the CTF to be written to the storage of given level
732 while (1) {
733 mLockFileName = fmt::format("{}/ctfs{}-{}_{}_{}_{}.lock", LOCKFileDir, level, o2::utils::Str::getRandomString(8), mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter);
734 if (!std::filesystem::exists(mLockFileName)) {
735 break;
736 }
737 }
738 mLockFD = open(mLockFileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
739 if (mLockFD == -1) {
740 throw std::runtime_error(fmt::format("Error opening lock file {}", mLockFileName));
741 }
742 if (lockf(mLockFD, F_LOCK, 0)) {
743 throw std::runtime_error(fmt::format("Error locking file {}", mLockFileName));
744 }
745}
746
747//___________________________________________________________________
748void CTFWriterSpec::removeLockFile()
749{
750 // remove CTF lock file
751 if (mLockFD != -1) {
752 if (lockf(mLockFD, F_ULOCK, 0)) {
753 throw std::runtime_error(fmt::format("Error unlocking file {}", mLockFileName));
754 }
755 mLockFD = -1;
756 std::error_code ec;
757 std::filesystem::remove(mLockFileName, ec); // use non-throwing version
758 }
759}
760
761//___________________________________________________________________
762size_t CTFWriterSpec::getAvailableDiskSpace(const std::string& path, int level)
763{
764 // count number of CTF files in processing (written to storage at given level) from their lock files
765 std::regex pat{fmt::format("({}/ctfs{}-[[:alnum:]_]+\\.lock$)", LOCKFileDir, level)};
766 int nLocked = 0;
767 size_t written = 0;
768 std::error_code ec;
769 for (const auto& entry : std::filesystem::directory_iterator(LOCKFileDir)) {
770 const auto& entryName = entry.path().native();
771 if (std::regex_search(entryName, pat) && (mLockFD < 0 || entryName != mLockFileName)) {
772 int fdt = open(entryName.c_str(), O_RDONLY);
773 if (fdt != -1) {
774 bool locked = lockf(fdt, F_TEST, 0) != 0;
775 if (locked) {
776 nLocked++;
777 size_t sz = 0;
778 auto nrd = read(fdt, &sz, sizeof(size_t));
779 if (nrd == sizeof(size_t)) {
780 written += sz;
781 }
782 }
783 close(fdt);
784 // unlocked file is either leftover from crached job or a file from concurent job which was being locked
785 // or just unlocked but not yet removed. In the former case remove it
786 if (!locked) {
787 struct stat statbuf;
788 if (stat(entryName.c_str(), &statbuf) != -1) { // if we fail to stat, the file was already removed
789#ifdef __APPLE__
790 auto ftime = statbuf.st_mtimespec.tv_sec; // last write time
791#else
792 auto ftime = statbuf.st_mtim.tv_sec; // last write time
793#endif
794 auto ctime = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
795 if (ftime + 60 < ctime) { // this is an old file, remove it
796 std::filesystem::remove(entryName, ec); // use non-throwing version
797 }
798 }
799 }
800 }
801 }
802 }
803 const auto si = std::filesystem::space(path, ec);
804 int64_t avail = int64_t(si.available) - nLocked * mChkSize + written; // account already written part of unfinished files
805 LOGP(debug, "{} CTF files open (curr.size: {}) -> can use {} of {} bytes", nLocked, written, avail, si.available);
806 return avail > 0 ? avail : 0;
807}
808
809//___________________________________________________________________
811{
812 std::vector<InputSpec> inputs;
813 LOG(debug) << "Detectors list:";
814 for (auto id = DetID::First; id <= DetID::Last; id++) {
815 if (inp.detMask[id]) {
816 uint32_t nLayers = 1;
817 DetID det{id};
818 if (det == DetID::ITS) {
820 } else if (det == DetID::MFT) {
822 }
823 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
824 inputs.emplace_back(CTFWriterSpec::getBinding(det.getName(), iLayer), det.getDataOrigin(), "CTFDATA", iLayer, Lifetime::Timeframe);
825 }
826 LOG(debug) << "Det " << DetID::getName(id) << " added";
827 }
828 }
829 return DataProcessorSpec{
830 "ctf-writer",
831 inputs,
832 Outputs{{OutputLabel{"ctfdone"}, "CTF", "DONE", 0, Lifetime::Timeframe},
833 {"CTF", "SIZES", 0, Lifetime::Timeframe}},
834 AlgorithmSpec{adaptFromTask<CTFWriterSpec>(inp)},
835 Options{
836 //{"output-type", VariantType::String, "ctf", {"output types: ctf (per TF) or dict (create dictionaries) or both or none"}},
837 {"save-ctf-after", VariantType::Int64, 0ll, {"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},
838 {"save-dict-after", VariantType::Int, 0, {"if > 0, in dictionary generation mode save it dictionary after certain number of TFs processed"}},
839 {"ctf-dict-dir", VariantType::String, "none", {"CTF dictionary directory, must exist"}},
840 {"output-dir", VariantType::String, "none", {"CTF output directory, must exist"}},
841 {"output-dir-alt", VariantType::String, "/dev/null", {"Alternative CTF output directory, must exist (if not /dev/null)"}},
842 {"meta-output-dir", VariantType::String, "/dev/null", {"CTF metadata output directory, must exist (if not /dev/null)"}},
843 {"md5-for-meta", VariantType::Bool, false, {"fill CTF file MD5 sum in the metadata file"}},
844 {"min-file-size", VariantType::Int64, 0l, {"accumulate CTFs until given file size reached"}},
845 {"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
846 {"max-ctf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
847 {"ctf-rejection", VariantType::Int, 0, {">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
848 {"ctf-file-compression", VariantType::Int, 0, {"if >= 0: impose CTF file compression level"}},
849 {"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
850 {"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
851 {"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
852 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
853}
854
855} // namespace ctf
856} // namespace o2
std::string binding
Definitions for CPV CTF data.
Header for CTF collection.
Definitions for CTP CTF data.
std::string getName(const TDataMember *dm, int index, int size)
Definitions for EMC CTF data.
std::ostringstream debug
Definitions for FDD CTF data.
Definitions for FT0 CTF data.
Definitions for FV0 CTF data.
Header of the AggregatedRunInfo struct.
Definitions for HMPID CTF data.
Definitions for ITS/MFT CTF data.
Definitions for MCH CTF data.
Definitions for MID CTF data.
Definition of the Names Generator class.
Definitions for PHOS CTF data.
uint32_t res
Definition RawData.h:0
Definitions for TOF CTF data.
Definitions for TPC CTF data.
Definitions for TRD CTF data.
TBranch * ptr
Definitions for ZDC CTF data.
Class for time synchronization of RawReader instances.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
Definition NameConf.cxx:93
static constexpr std::string_view CCDBOBJECT
Definition NameConf.h:66
static constexpr std::string_view CTFDICT
Definition NameConf.h:92
static constexpr std::string_view CTFTREENAME
Definition NameConf.h:95
void init(o2::framework::InitContext &ic) final
bool isPresent(DetID id) const
static std::string getBinding(const std::string &name, int spec)
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:146
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static std::string getNames(mask_t mask, char delimiter=',')
Definition DetID.cxx:74
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:95
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:97
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
functionality to maintain compatibility with previous version of this library
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint id
Definition glcorearb.h:650
public interface for building and renorming histograms from source data.
uint8_t itsSharedClusterMap uint8_t
constexpr Metadata makeMetadataRansDict(size_t symbolTablePrecision, source_T min, source_T max, size_t dictWords, ctf::Metadata::OptStore optStore) noexcept
Definition Metadata.h:114
framework::DataProcessorSpec getCTFWriterSpec(const o2::ctf::CTFWriterInp &inp)
create a processor spec
uint8_t BufferType
This is the type of the vector to be used for the EncodedBlocks buffer allocation.
size_t appendToTree(TTree &tree, const std::string brname, T &ptr)
DeliveryType read(const std::string &str)
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
Polygon< T > close(Polygon< T > polygon)
Definition Polygon.h:126
size_t computeRenormingPrecision(size_t nUsedAlphabetSymbols)
Definition compat.h:64
auto makeHistogramView(container_T &container, std::ptrdiff_t offset) noexcept -> HistogramView< decltype(std::begin(container))>
return * this
HistogramView< Hist_IT > trim(const HistogramView< Hist_IT > &buffer)
size_t countNUsedAlphabetSymbols(const AdaptiveHistogram< source_T > &histogram)
void createDirectoriesIfAbsent(std::string const &path)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
void empty(int)
Detector header base.
o2::detectors::DetID det
o2::detectors::DetID::mask_t detMask
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
std::vector< uint32_t > tfOrbits
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
static constexpr int getNLayers()
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
auto * ctfImage
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))