Project
Loading...
Searching...
No Matches
CTFWriterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include "Framework/Logger.h"
17#include "Framework/InputSpec.h"
22#include <fairmq/Device.h>
23
33#include "DataFormatsTPC/CTF.h"
34#include "DataFormatsTRD/CTF.h"
35#include "DataFormatsHMP/CTF.h"
36#include "DataFormatsFT0/CTF.h"
37#include "DataFormatsFV0/CTF.h"
38#include "DataFormatsFDD/CTF.h"
39#include "DataFormatsTOF/CTF.h"
40#include "DataFormatsMID/CTF.h"
41#include "DataFormatsMCH/CTF.h"
43#include "DataFormatsPHOS/CTF.h"
44#include "DataFormatsCPV/CTF.h"
45#include "DataFormatsZDC/CTF.h"
46#include "DataFormatsCTP/CTF.h"
47
48#include "rANS/histogram.h"
49#include "rANS/compat.h"
50
51#include <vector>
52#include <stdexcept>
53#include <array>
54#include <TStopwatch.h>
55#include <vector>
56#include <TFile.h>
57#include <TTree.h>
58#include <TRandom.h>
59#include <filesystem>
60#include <ctime>
61#include <sys/stat.h>
62#include <fcntl.h>
63#include <unistd.h>
64#include <regex>
65#include <numeric>
66
67using namespace o2::framework;
68
69namespace o2
70{
71namespace ctf
72{
73
74template <typename T>
75size_t appendToTree(TTree& tree, const std::string brname, T& ptr)
76{
77 size_t s = 0;
78 auto* br = tree.GetBranch(brname.c_str());
79 auto* pptr = &ptr;
80 if (br) {
81 br->SetAddress(&pptr);
82 } else {
83 br = tree.Branch(brname.c_str(), &pptr);
84 }
85 int res = br->Fill();
86 if (res < 0) {
87 throw std::runtime_error(fmt::format("Failed to fill CTF branch {}", brname));
88 }
89 s += res;
90 br->ResetAddress();
91 return s;
92}
93
96
98{
99 public:
100 CTFWriterSpec() = delete;
101 CTFWriterSpec(DetID::mask_t dm, const std::string& outType, int verbosity, int reportInterval);
102 ~CTFWriterSpec() final { finalize(); }
103 void init(o2::framework::InitContext& ic) final;
105 void endOfStream(o2::framework::EndOfStreamContext& ec) final { finalize(); }
106 void stop() final { finalize(); }
107 bool isPresent(DetID id) const { return mDets[id]; }
108
109 private:
110 void updateTimeDependentParams(ProcessingContext& pc);
111 template <typename C>
112 size_t processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree);
113 template <typename C>
114 void storeDictionary(DetID det, CTFHeader& header);
115 void storeDictionaries();
116 void closeTFTreeAndFile();
117 void prepareTFTreeAndFile();
118 size_t estimateCTFSize(ProcessingContext& pc);
119 size_t getAvailableDiskSpace(const std::string& path, int level);
120 void createLockFile(int level);
121 void removeLockFile();
122 void finalize();
123
124 DetID::mask_t mDets; // detectors
125 bool mFinalized = false;
126 bool mWriteCTF = true;
127 bool mCreateDict = false;
128 bool mCreateRunEnvDir = true;
129 bool mStoreMetaFile = false;
130 bool mRejectCurrentTF = false;
131 bool mFallBackDirUsed = false;
132 bool mFallBackDirProvided = false;
133 int mReportInterval = -1;
134 int mVerbosity = 0;
135 int mSaveDictAfter = 0; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
136 uint32_t mPrevDictTimeStamp = 0; // timestamp of the previously stored dictionary
137 uint32_t mDictTimeStamp = 0; // timestamp of the currently stored dictionary
138 size_t mMinSize = 0; // if > 0, accumulate CTFs in the same tree until the total size exceeds this minimum
139 size_t mMaxSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinSize is not reached)
140 size_t mChkSize = 0; // if > 0 and fallback storage provided, reserve this size per CTF file in production on primary storage
141 size_t mAccCTFSize = 0; // so far accumulated size (if any)
142 size_t mCurrCTFSize = 0; // size of currently processed CTF
143 size_t mNCTF = 0; // total number of CTFs written
144 size_t mNCTFPrevDict = 0; // total number of CTFs used for previous dictionary version
145 size_t mNAccCTF = 0; // total number of CTFs accumulated in the current file
146 int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
147 int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
148 float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
149 long mCTFAutoSave = 0; // if > 0, autosave after so many TFs
150 size_t mNCTFFiles = 0; // total number of CTF files written
151 int mMaxCTFPerFile = 0; // max CTFs per files to store
152 int mRejRate = 0; // CTF rejection rule (>0: percentage to reject randomly, <0: reject if timeslice%|value|!=0)
153 int mCTFFileCompression = 0; // CTF file compression level (if >= 0)
154 bool mFillMD5 = false;
155 std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
156 o2::framework::DataTakingContext mDataTakingContext{};
157 o2::framework::TimingInfo mTimingInfo{};
158 std::string mOutputType{}; // RS FIXME once global/local options clash is solved, --output-type will become device option
159 std::string mDictDir{};
160 std::string mCTFDir{};
161 std::string mHostName{};
162 std::string mCTFDirFallBack = "/dev/null";
163 std::string mCTFMetaFileDir = "/dev/null";
164 std::string mCurrentCTFFileName{};
165 std::string mCurrentCTFFileNameFull{};
166 std::string mSizeReport{};
167 std::string mMetaDataType{};
168 const std::string LOCKFileDir = "/tmp/ctf-writer-locks";
169 std::string mLockFileName{};
170 int mLockFD = -1;
171 std::unique_ptr<TFile> mCTFFileOut;
172 std::unique_ptr<TTree> mCTFTreeOut;
173
174 std::unique_ptr<TFile> mDictFileOut; // file to store dictionary
175 std::unique_ptr<TTree> mDictTreeOut; // tree to store dictionary
176
177 // For the external dictionary creation we accumulate for each detector the frequency tables of its each block
178 // After accumulation over multiple TFs we store the dictionaries data in the standard CTF format of this detector,
179 // i.e. EncodedBlock stored in a tree, BUT with dictionary data only added to each block.
180 // The metadata of the block (min,max) will be used for the consistency check at the decoding
181 std::array<std::vector<FTrans>, DetID::nDetectors> mFreqsAccumulation;
182 std::array<std::vector<o2::ctf::Metadata>, DetID::nDetectors> mFreqsMetaData;
183 std::array<std::bitset<64>, DetID::nDetectors> mIsSaturatedFrequencyTable;
184 std::array<std::shared_ptr<void>, DetID::nDetectors> mHeaders;
185 TStopwatch mTimer;
186
187 static const std::string TMPFileEnding;
188};
189
190const std::string CTFWriterSpec::TMPFileEnding{".part"};
191
192//___________________________________________________________________
193CTFWriterSpec::CTFWriterSpec(DetID::mask_t dm, const std::string& outType, int verbosity, int reportInterval)
194 : mDets(dm), mOutputType(outType), mReportInterval(reportInterval), mVerbosity(verbosity)
195{
196 std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](auto& bitset) { bitset.reset(); });
197 mTimer.Stop();
198 mTimer.Reset();
199}
200
201//___________________________________________________________________
203{
204 // auto outmode = ic.options().get<std::string>("output-type"); // RS FIXME once global/local options clash is solved, --output-type will become device option
205 auto outmode = mOutputType;
206 if (outmode == "ctf") {
207 mWriteCTF = true;
208 mCreateDict = false;
209 } else if (outmode == "dict") {
210 mWriteCTF = false;
211 mCreateDict = true;
212 } else if (outmode == "both") {
213 mWriteCTF = true;
214 mCreateDict = true;
215 } else if (outmode == "none") {
216 mWriteCTF = false;
217 mCreateDict = false;
218 } else {
219 throw std::invalid_argument("Invalid output-type");
220 }
221
222 mSaveDictAfter = ic.options().get<int>("save-dict-after");
223 mCTFAutoSave = ic.options().get<long>("save-ctf-after");
224 mCTFFileCompression = ic.options().get<int>("ctf-file-compression");
225 mCTFMetaFileDir = ic.options().get<std::string>("meta-output-dir");
226 if (mCTFMetaFileDir != "/dev/null") {
227 mCTFMetaFileDir = o2::utils::Str::rectifyDirectory(mCTFMetaFileDir);
228 mStoreMetaFile = true;
229 mFillMD5 = ic.options().get<bool>("md5-for-meta");
230 }
231 mDictDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("ctf-dict-dir"));
232 mCTFDir = ic.options().get<std::string>("output-dir");
233 if (mCTFDir != "/dev/null") {
234 mCTFDir = o2::utils::Str::rectifyDirectory(mCTFDir);
235 } else {
236 mWriteCTF = false;
237 mStoreMetaFile = false;
238 }
239 mCTFDirFallBack = ic.options().get<std::string>("output-dir-alt");
240 if (mCTFDirFallBack != "/dev/null") {
241 mCTFDirFallBack = o2::utils::Str::rectifyDirectory(mCTFDirFallBack);
242 mFallBackDirProvided = true;
243 }
244 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
245 mMinSize = ic.options().get<int64_t>("min-file-size");
246 mMaxSize = ic.options().get<int64_t>("max-file-size");
247 mMaxCTFPerFile = ic.options().get<int>("max-ctf-per-file");
248 mRejRate = ic.options().get<int>("ctf-rejection");
249 if (mRejRate > 0) {
250 LOGP(info, "Will reject{} {}% of TFs", mRejRate < 100 ? " randomly" : "", mRejRate < 100 ? mRejRate : 100);
251 } else if (mRejRate < -1) {
252 LOGP(info, "Will reject all but each {}-th TF slice", -mRejRate);
253 }
254
255 if (mWriteCTF) {
256 if (mMinSize > 0) {
257 LOG(info) << "Multiple CTFs will be accumulated in the tree/file until its size exceeds " << mMinSize << " bytes";
258 if (mMaxSize > mMinSize) {
259 LOG(info) << "but does not exceed " << mMaxSize << " bytes";
260 }
261 }
262 }
263
264 mCheckDiskFull = ic.options().get<float>("require-free-disk");
265 mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
266 mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
267
268 mChkSize = std::max(size_t(mMinSize * 1.1), mMaxSize);
270
271 if (mCreateDict) { // make sure that there is no local dictonary
272 std::string dictFileName = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
273 if (std::filesystem::exists(dictFileName)) {
274 throw std::runtime_error(o2::utils::Str::concat_string("CTF dictionary creation is requested but ", dictFileName, " already exists, remove it!"));
275 }
277 }
278
279 char hostname[_POSIX_HOST_NAME_MAX];
280 gethostname(hostname, _POSIX_HOST_NAME_MAX);
281 mHostName = hostname;
282 mHostName = mHostName.substr(0, mHostName.find('.'));
283}
284
285//___________________________________________________________________
286void CTFWriterSpec::updateTimeDependentParams(ProcessingContext& pc)
287{
288 namespace GRPECS = o2::parameters::GRPECS;
289 mTimingInfo = pc.services().get<o2::framework::TimingInfo>();
290 if (mTimingInfo.globalRunNumberChanged) {
291 mDataTakingContext = pc.services().get<DataTakingContext>();
292 // determine the output type for the CTF metadata
293 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.runType, mDataTakingContext.forcedRaw);
294 }
295}
296
297//___________________________________________________________________
298// process data of particular detector
299template <typename C>
300size_t CTFWriterSpec::processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree)
301{
302 static bool warnedEmpty = false;
303 size_t sz = 0;
304 if (!isPresent(det) || !pc.inputs().isValid(det.getName())) {
305 mSizeReport += fmt::format(" {}:N/A", det.getName());
306 return sz;
307 }
308 auto ctfBuffer = pc.inputs().get<gsl::span<o2::ctf::BufferType>>(det.getName());
309 const o2::ctf::BufferType* bdata = ctfBuffer.data();
310 if (bdata) {
311 if (warnedEmpty) {
312 throw std::runtime_error(fmt::format("Non-empty input was seen at {}-th TF after empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
313 }
314 const auto ctfImage = C::getImage(bdata);
315 ctfImage.print(o2::utils::Str::concat_string(det.getName(), ": "), mVerbosity);
316 if (mWriteCTF && !mRejectCurrentTF) {
317 sz = ctfImage.appendToTree(*tree, det.getName());
318 header.detectors.set(det);
319 } else {
320 sz = ctfBuffer.size();
321 }
322 if (mCreateDict) {
323 if (mFreqsAccumulation[det].empty()) {
324 mFreqsAccumulation[det].resize(C::getNBlocks());
325 mFreqsMetaData[det].resize(C::getNBlocks());
326 }
327 if (!mHeaders[det]) { // store 1st header
328 mHeaders[det] = ctfImage.cloneHeader();
329 auto& hb = *static_cast<o2::ctf::CTFDictHeader*>(mHeaders[det].get());
330 hb.det = det;
331 }
332 for (int ib = 0; ib < C::getNBlocks(); ib++) {
333 if (!mIsSaturatedFrequencyTable[det][ib]) {
334 const auto& bl = ctfImage.getBlock(ib);
335 if (bl.getNDict()) {
336 auto freq = mFreqsAccumulation[det][ib];
337 auto& mdSave = mFreqsMetaData[det][ib];
338 const auto& md = ctfImage.getMetadata(ib);
339 if ([&, this]() {
340 try {
341 freq.addFrequencies(bl.getDict(), bl.getDict() + bl.getNDict(), md.min);
342 } catch (const std::overflow_error& e) {
343 LOGP(warning, "unable to add frequency table for {}, block {} due to overflow", det.getName(), ib);
344 mIsSaturatedFrequencyTable[det][ib] = true;
345 return false;
346 }
347 return true;
348 }()) {
350 auto histogramView = o2::rans::trim(o2::rans::makeHistogramView(freq));
351 mdSave = ctf::detail::makeMetadataRansDict(newProbBits,
352 static_cast<int32_t>(histogramView.getMin()),
353 static_cast<int32_t>(histogramView.getMax()),
354 static_cast<int32_t>(histogramView.size()),
355 md.opt);
356 mFreqsAccumulation[det][ib] = std::move(freq);
357 }
358 }
359 }
360 }
361 }
362 } else {
363 if (!warnedEmpty) {
364 if (mNCTF) {
365 throw std::runtime_error(fmt::format("Empty input was seen at {}-th TF after non-empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
366 }
367 LOGP(important, "Empty CTF provided for {}, skipping and will not report anymore", det.getName());
368 warnedEmpty = true;
369 }
370 }
371 mSizeReport += fmt::format(" {}:{}", det.getName(), fmt::group_digits(sz));
372 return sz;
373}
374
375//___________________________________________________________________
376// store dictionary of a particular detector
377template <typename C>
378void CTFWriterSpec::storeDictionary(DetID det, CTFHeader& header)
379{
380 // create vector whose data contains dictionary in CTF format (EncodedBlock)
381 if (!isPresent(det) || !mFreqsAccumulation[det].size()) {
382 return;
383 }
384 auto dictBlocks = C::createDictionaryBlocks(mFreqsAccumulation[det], mFreqsMetaData[det]);
385 auto& h = C::get(dictBlocks.data())->getHeader();
386 h = *reinterpret_cast<typename std::remove_reference<decltype(h)>::type*>(mHeaders[det].get());
387 auto& hb = static_cast<o2::ctf::CTFDictHeader&>(h);
388 hb = *static_cast<const o2::ctf::CTFDictHeader*>(mHeaders[det].get());
389 hb.dictTimeStamp = mDictTimeStamp;
390
391 auto getFileName = [this, det, &hb](bool curr) {
392 return fmt::format("{}{}_{}_v{}.{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, det.getName(), int(hb.majorVersion), int(hb.minorVersion),
393 curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
394 };
395
396 C::get(dictBlocks.data())->print(o2::utils::Str::concat_string("Storing dictionary for ", det.getName(), ": "));
397 auto outName = getFileName(true);
398 TFile flout(outName.c_str(), "recreate");
399 flout.WriteObject(&dictBlocks, o2::base::NameConf::CCDBOBJECT.data());
400 flout.WriteObject(&hb, fmt::format("ctf_dict_header_{}", det.getName()).c_str());
401 flout.Close();
402 LOGP(info, "Saved {} with {} TFs to {}", hb.asString(), mNCTF, outName);
403 if (mPrevDictTimeStamp) {
404 auto outNamePrev = getFileName(false);
405 if (std::filesystem::exists(outNamePrev)) {
406 std::filesystem::remove(outNamePrev);
407 LOGP(info, "Removed previous dictionary version {}", outNamePrev);
408 }
409 }
410 C::get(dictBlocks.data())->appendToTree(*mDictTreeOut.get(), det.getName()); // cast to EncodedBlock and attach to dictionaries tree
411 header.detectors.set(det);
412}
413
414//___________________________________________________________________
415size_t CTFWriterSpec::estimateCTFSize(ProcessingContext& pc)
416{
417 size_t s = 0;
418 for (auto id = DetID::First; id <= DetID::Last; id++) {
419 DetID det(id);
420 if (!isPresent(det) || !pc.inputs().isValid(det.getName())) {
421 continue;
422 }
423 s += pc.inputs().get<gsl::span<o2::ctf::BufferType>>(det.getName()).size();
424 }
425 return s;
426}
427
428//___________________________________________________________________
430{
431 const std::string NAStr = "NA";
433 mTimer.Reset();
434 }
435 auto cput = mTimer.CpuTime();
436 mTimer.Start(false);
437 updateTimeDependentParams(pc);
438 mRejectCurrentTF = (mRejRate > 0 && int(gRandom->Rndm() * 100) < mRejRate) || (mRejRate < -1 && mTimingInfo.timeslice % (-mRejRate));
439 mCurrCTFSize = estimateCTFSize(pc);
440 if (mWriteCTF && !mRejectCurrentTF) {
441 prepareTFTreeAndFile();
442
443 int totalWait = 0, nwaitCycles = 0;
444 while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) { // we are on the physical disk and not on the RAM disk
445 constexpr size_t MB = 1024 * 1024;
446 constexpr int showFirstN = 10, prsecaleWarnings = 50;
447 try {
448 const auto si = std::filesystem::space(mCTFFileOut->GetName());
449 std::string wmsg{};
450 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
451 nwaitCycles++;
452 wmsg = fmt::format("Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB, size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
453 } else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
454 nwaitCycles++;
455 wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
456 } else {
457 nwaitCycles = 0;
458 }
459 if (nwaitCycles) {
460 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
461 closeTFTreeAndFile(); // try to save whatever we have
462 LOGP(fatal, "Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
463 }
464 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
465 LOG(alarm) << wmsg;
466 }
467 pc.services().get<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
468 totalWait += mWaitDiskFull;
469 continue;
470 }
471 } catch (std::exception const& e) {
472 LOG(fatal) << "unable to query disk space info for path " << mCurrentCTFFileNameFull << ", reason: " << e.what();
473 }
474 break;
475 }
476 }
477 // create header
478 CTFHeader header{mTimingInfo.runNumber, mTimingInfo.creation, mTimingInfo.firstTForbit, mTimingInfo.tfCounter};
479 size_t szCTF = 0;
480 mSizeReport = "";
481 std::array<size_t, DetID::CTP + 1> szCTFperDet{0}; // DetID::TST is between FDD and CTP and remains empty
482 szCTFperDet[DetID::ITS] = processDet<o2::itsmft::CTF>(pc, DetID::ITS, header, mCTFTreeOut.get());
483 szCTFperDet[DetID::TPC] = processDet<o2::tpc::CTF>(pc, DetID::TPC, header, mCTFTreeOut.get());
484 szCTFperDet[DetID::TRD] = processDet<o2::trd::CTF>(pc, DetID::TRD, header, mCTFTreeOut.get());
485 szCTFperDet[DetID::TOF] = processDet<o2::tof::CTF>(pc, DetID::TOF, header, mCTFTreeOut.get());
486 szCTFperDet[DetID::PHS] = processDet<o2::phos::CTF>(pc, DetID::PHS, header, mCTFTreeOut.get());
487 szCTFperDet[DetID::CPV] = processDet<o2::cpv::CTF>(pc, DetID::CPV, header, mCTFTreeOut.get());
488 szCTFperDet[DetID::EMC] = processDet<o2::emcal::CTF>(pc, DetID::EMC, header, mCTFTreeOut.get());
489 szCTFperDet[DetID::HMP] = processDet<o2::hmpid::CTF>(pc, DetID::HMP, header, mCTFTreeOut.get());
490 szCTFperDet[DetID::MFT] = processDet<o2::itsmft::CTF>(pc, DetID::MFT, header, mCTFTreeOut.get());
491 szCTFperDet[DetID::MCH] = processDet<o2::mch::CTF>(pc, DetID::MCH, header, mCTFTreeOut.get());
492 szCTFperDet[DetID::MID] = processDet<o2::mid::CTF>(pc, DetID::MID, header, mCTFTreeOut.get());
493 szCTFperDet[DetID::ZDC] = processDet<o2::zdc::CTF>(pc, DetID::ZDC, header, mCTFTreeOut.get());
494 szCTFperDet[DetID::FT0] = processDet<o2::ft0::CTF>(pc, DetID::FT0, header, mCTFTreeOut.get());
495 szCTFperDet[DetID::FV0] = processDet<o2::fv0::CTF>(pc, DetID::FV0, header, mCTFTreeOut.get());
496 szCTFperDet[DetID::FDD] = processDet<o2::fdd::CTF>(pc, DetID::FDD, header, mCTFTreeOut.get());
497 szCTFperDet[DetID::CTP] = processDet<o2::ctp::CTF>(pc, DetID::CTP, header, mCTFTreeOut.get());
498 szCTF = std::accumulate(szCTFperDet.begin(), szCTFperDet.end(), 0);
499 if (mReportInterval > 0 && (mTimingInfo.tfCounter % mReportInterval) == 0) {
500 LOGP(important, "CTF {} size report:{} - Total:{}", mTimingInfo.tfCounter, mSizeReport, fmt::group_digits(szCTF));
501 }
502
503 mTimer.Stop();
504
505 if (mWriteCTF && !mRejectCurrentTF) {
506 szCTF += appendToTree(*mCTFTreeOut.get(), "CTFHeader", header);
507 size_t prevSizeMB = mAccCTFSize / (1 << 20);
508 mAccCTFSize += szCTF;
509 mCTFTreeOut->SetEntries(++mNAccCTF);
510 mTFOrbits.push_back(mTimingInfo.firstTForbit);
511 LOG(info) << "TF#" << mNCTF << ": wrote CTF{" << header << "} of size " << szCTF << " to " << mCurrentCTFFileNameFull << " in " << mTimer.CpuTime() - cput << " s";
512 if (mNAccCTF > 1) {
513 LOG(info) << "Current CTF tree has " << mNAccCTF << " entries with total size of " << mAccCTFSize << " bytes";
514 }
515 if (mLockFD != -1) {
516 lseek(mLockFD, 0, SEEK_SET);
517 auto nwr = write(mLockFD, &mAccCTFSize, sizeof(size_t));
518 if (nwr != sizeof(size_t)) {
519 LOG(error) << "Failed to write current CTF size " << mAccCTFSize << " to lock file, bytes written: " << nwr;
520 }
521 }
522
523 if (mAccCTFSize >= mMinSize || (mMaxCTFPerFile > 0 && mNAccCTF >= mMaxCTFPerFile)) {
524 closeTFTreeAndFile();
525 } else if ((mCTFAutoSave > 0 && mNAccCTF % mCTFAutoSave == 0) || (mCTFAutoSave < 0 && int(prevSizeMB / (-mCTFAutoSave)) != size_t(mAccCTFSize / (1 << 20)) / (-mCTFAutoSave))) {
526 mCTFTreeOut->AutoSave("override");
527 }
528 } else {
529 LOG(info) << "TF#" << mNCTF << " {" << header << "} CTF writing is disabled, size was " << szCTF << " bytes";
530 }
531
532 mNCTF++;
533 if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
534 storeDictionaries();
535 }
536 int dummy = 0;
537 pc.outputs().snapshot({"ctfdone", 0}, dummy);
538 pc.outputs().snapshot(Output{"CTF", "SIZES", 0}, szCTFperDet);
539}
540
541//___________________________________________________________________
542void CTFWriterSpec::finalize()
543{
544 if (mFinalized) {
545 return;
546 }
547 if (mCreateDict) {
548 storeDictionaries();
549 }
550 if (mWriteCTF) {
551 closeTFTreeAndFile();
552 }
553 LOGF(info, "CTF writing total timing: Cpu: %.3e Real: %.3e s in %d slots",
554 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
555 mFinalized = true;
556 mNCTF = 0;
557 mNCTFFiles = 0;
558}
559
560//___________________________________________________________________
561void CTFWriterSpec::prepareTFTreeAndFile()
562{
563 if (!mWriteCTF) {
564 return;
565 }
566 bool needToOpen = false;
567 if (!mCTFTreeOut) {
568 needToOpen = true;
569 } else {
570 if ((mAccCTFSize >= mMinSize) || // min size exceeded, may close the file.
571 (mAccCTFSize && mMaxSize > mMinSize && ((mAccCTFSize + mCurrCTFSize) > mMaxSize))) { // this is not the 1st CTF in the file and the new size will exceed allowed max
572 needToOpen = true;
573 } else {
574 LOGP(info, "Will add new CTF of estimated size {} to existing file of size {}", mCurrCTFSize, mAccCTFSize);
575 }
576 }
577 if (needToOpen) {
578 closeTFTreeAndFile();
579 mFallBackDirUsed = false;
580 auto ctfDir = mCTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mCTFDir;
581 if (mChkSize > 0 && mFallBackDirProvided) {
582 createLockFile(0);
583 auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
584 if (sz < mChkSize) {
585 removeLockFile();
586 LOG(warning) << "Primary CTF output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
587 ctfDir = mCTFDirFallBack;
588 mFallBackDirUsed = true;
589 }
590 }
591 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
592 ctfDir += fmt::format("{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
593 if (!ctfDir.empty()) {
595 LOGP(info, "Created {} directory for CTFs output", ctfDir);
596 }
597 }
598 mCurrentCTFFileName = o2::base::NameConf::getCTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter, mHostName);
599 mCurrentCTFFileNameFull = fmt::format("{}{}", ctfDir, mCurrentCTFFileName);
600 mCTFFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
601 if (mCTFFileCompression >= 0) {
602 mCTFFileOut->SetCompressionLevel(mCTFFileCompression);
603 }
604 mCTFTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFTREENAME).c_str(), "O2 CTF tree");
605
606 mNCTFFiles++;
607 }
608}
609
610//___________________________________________________________________
611void CTFWriterSpec::closeTFTreeAndFile()
612{
613 if (mCTFTreeOut) {
614 try {
615 mCTFFileOut->cd();
616 mCTFTreeOut->Write();
617 mCTFTreeOut.reset();
618 mCTFFileOut->Close();
619 mCTFFileOut.reset();
620 // write CTF file metaFile data
621 auto actualFileName = TMPFileEnding.empty() ? mCurrentCTFFileNameFull : o2::utils::Str::concat_string(mCurrentCTFFileNameFull, TMPFileEnding);
622 if (mStoreMetaFile) {
624 if (!ctfMetaData.fillFileData(actualFileName, mFillMD5, TMPFileEnding)) {
625 throw std::runtime_error("metadata file was requested but not created");
626 }
627 ctfMetaData.setDataTakingContext(mDataTakingContext);
628 ctfMetaData.type = mMetaDataType;
629 ctfMetaData.priority = mFallBackDirUsed ? "low" : "high";
630 ctfMetaData.tfOrbits.swap(mTFOrbits);
631 auto metaFileNameTmp = fmt::format("{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
632 auto metaFileName = fmt::format("{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
633 try {
634 std::ofstream metaFileOut(metaFileNameTmp);
635 metaFileOut << ctfMetaData;
636 metaFileOut.close();
637 if (!TMPFileEnding.empty()) {
638 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
639 }
640 std::filesystem::rename(metaFileNameTmp, metaFileName);
641 } catch (std::exception const& e) {
642 LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
643 }
644 } else if (!TMPFileEnding.empty()) {
645 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
646 }
647 } catch (std::exception const& e) {
648 LOG(error) << "Failed to finalize CTF file " << mCurrentCTFFileNameFull << ", reason: " << e.what();
649 }
650 mTFOrbits.clear();
651 mNAccCTF = 0;
652 mAccCTFSize = 0;
653 removeLockFile();
654 }
655}
656
657//___________________________________________________________________
658void CTFWriterSpec::storeDictionaries()
659{
660 // monolitic dictionary in tree format
661 mDictTimeStamp = uint32_t(std::time(nullptr));
662 auto getFileName = [this](bool curr) {
663 return fmt::format("{}{}Tree_{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, DetID::getNames(this->mDets, '-'), curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
664 };
665 auto dictFileName = getFileName(true);
666 mDictFileOut.reset(TFile::Open(dictFileName.c_str(), "recreate"));
667 mDictTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFDICT).c_str(), "O2 CTF dictionary");
668
669 CTFHeader header{mTimingInfo.runNumber, uint32_t(mNCTF)};
670 storeDictionary<o2::itsmft::CTF>(DetID::ITS, header);
671 storeDictionary<o2::itsmft::CTF>(DetID::MFT, header);
672 storeDictionary<o2::tpc::CTF>(DetID::TPC, header);
673 storeDictionary<o2::trd::CTF>(DetID::TRD, header);
674 storeDictionary<o2::tof::CTF>(DetID::TOF, header);
675 storeDictionary<o2::ft0::CTF>(DetID::FT0, header);
676 storeDictionary<o2::fv0::CTF>(DetID::FV0, header);
677 storeDictionary<o2::fdd::CTF>(DetID::FDD, header);
678 storeDictionary<o2::mid::CTF>(DetID::MID, header);
679 storeDictionary<o2::mch::CTF>(DetID::MCH, header);
680 storeDictionary<o2::emcal::CTF>(DetID::EMC, header);
681 storeDictionary<o2::phos::CTF>(DetID::PHS, header);
682 storeDictionary<o2::cpv::CTF>(DetID::CPV, header);
683 storeDictionary<o2::zdc::CTF>(DetID::ZDC, header);
684 storeDictionary<o2::hmpid::CTF>(DetID::HMP, header);
685 storeDictionary<o2::ctp::CTF>(DetID::CTP, header);
686 mDictFileOut->cd();
687 appendToTree(*mDictTreeOut.get(), "CTFHeader", header);
688 mDictTreeOut->SetEntries(1);
689 mDictTreeOut->Write(mDictTreeOut->GetName(), TObject::kSingleKey);
690 mDictTreeOut.reset();
691 mDictFileOut.reset();
692 std::string dictFileNameLnk = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
693 if (std::filesystem::exists(dictFileNameLnk)) {
694 std::filesystem::remove(dictFileNameLnk);
695 }
696 std::filesystem::create_symlink(dictFileName, dictFileNameLnk);
697 LOGP(info, "Saved CTF dictionaries tree with {} TFs to {} and linked to {}", mNCTF, dictFileName, dictFileNameLnk);
698 if (mPrevDictTimeStamp) {
699 auto dictFileNamePrev = getFileName(false);
700 if (std::filesystem::exists(dictFileNamePrev)) {
701 std::filesystem::remove(dictFileNamePrev);
702 LOGP(info, "Removed previous dictionary version {}", dictFileNamePrev);
703 }
704 }
705 mNCTFPrevDict = mNCTF;
706 mPrevDictTimeStamp = mDictTimeStamp;
707}
708
709//___________________________________________________________________
710void CTFWriterSpec::createLockFile(int level)
711{
712 // create lock file for the CTF to be written to the storage of given level
713 while (1) {
714 mLockFileName = fmt::format("{}/ctfs{}-{}_{}_{}_{}.lock", LOCKFileDir, level, o2::utils::Str::getRandomString(8), mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter);
715 if (!std::filesystem::exists(mLockFileName)) {
716 break;
717 }
718 }
719 mLockFD = open(mLockFileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
720 if (mLockFD == -1) {
721 throw std::runtime_error(fmt::format("Error opening lock file {}", mLockFileName));
722 }
723 if (lockf(mLockFD, F_LOCK, 0)) {
724 throw std::runtime_error(fmt::format("Error locking file {}", mLockFileName));
725 }
726}
727
728//___________________________________________________________________
729void CTFWriterSpec::removeLockFile()
730{
731 // remove CTF lock file
732 if (mLockFD != -1) {
733 if (lockf(mLockFD, F_ULOCK, 0)) {
734 throw std::runtime_error(fmt::format("Error unlocking file {}", mLockFileName));
735 }
736 mLockFD = -1;
737 std::error_code ec;
738 std::filesystem::remove(mLockFileName, ec); // use non-throwing version
739 }
740}
741
742//___________________________________________________________________
743size_t CTFWriterSpec::getAvailableDiskSpace(const std::string& path, int level)
744{
745 // count number of CTF files in processing (written to storage at given level) from their lock files
746 std::regex pat{fmt::format("({}/ctfs{}-[[:alnum:]_]+\\.lock$)", LOCKFileDir, level)};
747 int nLocked = 0;
748 size_t written = 0;
749 std::error_code ec;
750 for (const auto& entry : std::filesystem::directory_iterator(LOCKFileDir)) {
751 const auto& entryName = entry.path().native();
752 if (std::regex_search(entryName, pat) && (mLockFD < 0 || entryName != mLockFileName)) {
753 int fdt = open(entryName.c_str(), O_RDONLY);
754 if (fdt != -1) {
755 bool locked = lockf(fdt, F_TEST, 0) != 0;
756 if (locked) {
757 nLocked++;
758 size_t sz = 0;
759 auto nrd = read(fdt, &sz, sizeof(size_t));
760 if (nrd == sizeof(size_t)) {
761 written += sz;
762 }
763 }
764 close(fdt);
765 // unlocked file is either leftover from crached job or a file from concurent job which was being locked
766 // or just unlocked but not yet removed. In the former case remove it
767 if (!locked) {
768 struct stat statbuf;
769 if (stat(entryName.c_str(), &statbuf) != -1) { // if we fail to stat, the file was already removed
770#ifdef __APPLE__
771 auto ftime = statbuf.st_mtimespec.tv_sec; // last write time
772#else
773 auto ftime = statbuf.st_mtim.tv_sec; // last write time
774#endif
775 auto ctime = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
776 if (ftime + 60 < ctime) { // this is an old file, remove it
777 std::filesystem::remove(entryName, ec); // use non-throwing version
778 }
779 }
780 }
781 }
782 }
783 }
784 const auto si = std::filesystem::space(path, ec);
785 int64_t avail = int64_t(si.available) - nLocked * mChkSize + written; // account already written part of unfinished files
786 LOGP(debug, "{} CTF files open (curr.size: {}) -> can use {} of {} bytes", nLocked, written, avail, si.available);
787 return avail > 0 ? avail : 0;
788}
789
790//___________________________________________________________________
791DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, const std::string& outType, int verbosity, int reportInterval)
792{
793 std::vector<InputSpec> inputs;
794 LOG(debug) << "Detectors list:";
795 for (auto id = DetID::First; id <= DetID::Last; id++) {
796 if (dets[id]) {
797 inputs.emplace_back(DetID::getName(id), DetID::getDataOrigin(id), "CTFDATA", 0, Lifetime::Timeframe);
798 LOG(debug) << "Det " << DetID::getName(id) << " added";
799 }
800 }
801 return DataProcessorSpec{
802 "ctf-writer",
803 inputs,
804 Outputs{{OutputLabel{"ctfdone"}, "CTF", "DONE", 0, Lifetime::Timeframe},
805 {"CTF", "SIZES", 0, Lifetime::Timeframe}},
806 AlgorithmSpec{adaptFromTask<CTFWriterSpec>(dets, outType, verbosity, reportInterval)}, // RS FIXME once global/local options clash is solved, --output-type will become device option
807 Options{ //{"output-type", VariantType::String, "ctf", {"output types: ctf (per TF) or dict (create dictionaries) or both or none"}},
808 {"save-ctf-after", VariantType::Int64, 0ll, {"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},
809 {"save-dict-after", VariantType::Int, 0, {"if > 0, in dictionary generation mode save it dictionary after certain number of TFs processed"}},
810 {"ctf-dict-dir", VariantType::String, "none", {"CTF dictionary directory, must exist"}},
811 {"output-dir", VariantType::String, "none", {"CTF output directory, must exist"}},
812 {"output-dir-alt", VariantType::String, "/dev/null", {"Alternative CTF output directory, must exist (if not /dev/null)"}},
813 {"meta-output-dir", VariantType::String, "/dev/null", {"CTF metadata output directory, must exist (if not /dev/null)"}},
814 {"md5-for-meta", VariantType::Bool, false, {"fill CTF file MD5 sum in the metadata file"}},
815 {"min-file-size", VariantType::Int64, 0l, {"accumulate CTFs until given file size reached"}},
816 {"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
817 {"max-ctf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
818 {"ctf-rejection", VariantType::Int, 0, {">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
819 {"ctf-file-compression", VariantType::Int, 0, {"if >= 0: impose CTF file compression level"}},
820 {"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
821 {"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
822 {"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
823 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
824}
825
826} // namespace ctf
827} // namespace o2
#define verbosity
Definitions for CPV CTF data.
Header for CTF collection.
Definitions for CTP CTF data.
Definitions for EMC CTF data.
Definitions for FDD CTF data.
Definitions for FT0 CTF data.
Definitions for FV0 CTF data.
Header of the AggregatedRunInfo struct.
Definitions for HMPID CTF data.
Definitions for ITS/MFT CTF data.
Definitions for MCH CTF data.
Definitions for MID CTF data.
Definition of the Names Generator class.
Definitions for PHOS CTF data.
uint32_t res
Definition RawData.h:0
Definitions for TOF CTF data.
Definitions for TPC CTF data.
Definitions for TRD CTF data.
TBranch * ptr
std::ostringstream debug
Definitions for ZDC CTF data.
Class for time synchronization of RawReader instances.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
Definition NameConf.cxx:93
static constexpr std::string_view CCDBOBJECT
Definition NameConf.h:66
static constexpr std::string_view CTFDICT
Definition NameConf.h:92
static constexpr std::string_view CTFTREENAME
Definition NameConf.h:95
void init(o2::framework::InitContext &ic) final
bool isPresent(DetID id) const
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static std::string getNames(mask_t mask, char delimiter=',')
Definition DetID.cxx:74
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:94
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:96
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
functionality to maintain compatibility with previous version of this library
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint id
Definition glcorearb.h:650
public interface for building and renorming histograms from source data.
uint8_t itsSharedClusterMap uint8_t
constexpr Metadata makeMetadataRansDict(size_t symbolTablePrecision, source_T min, source_T max, size_t dictWords, ctf::Metadata::OptStore optStore) noexcept
Definition Metadata.h:101
uint8_t BufferType
This is the type of the vector to be used for the EncodedBlocks buffer allocation.
size_t appendToTree(TTree &tree, const std::string brname, T &ptr)
framework::DataProcessorSpec getCTFWriterSpec(o2::detectors::DetID::mask_t dets, const std::string &outType, int verbosity, int reportInterval)
create a processor spec
DeliveryType read(const std::string &str)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
Polygon< T > close(Polygon< T > polygon)
Definition Polygon.h:126
size_t computeRenormingPrecision(size_t nUsedAlphabetSymbols)
Definition compat.h:64
auto makeHistogramView(container_T &container, std::ptrdiff_t offset) noexcept -> HistogramView< decltype(std::begin(container))>
return * this
HistogramView< Hist_IT > trim(const HistogramView< Hist_IT > &buffer)
size_t countNUsedAlphabetSymbols(const AdaptiveHistogram< source_T > &histogram)
void createDirectoriesIfAbsent(std::string const &path)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
void empty(int)
Detector header base.
o2::detectors::DetID det
bool fillFileData(const std::string &fname, bool fillmd5=false, const std::string &tmpEnding="")
std::vector< uint32_t > tfOrbits
void setDataTakingContext(const o2::framework::DataTakingContext &dtc)
bool forcedRaw
ECS declared run data storage type as raw.
std::string envId
The environment ID for the deployment.
static constexpr const char * UNKNOWN
std::string runType
The run type of the current run.
std::string runNumber
The current run number.
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
auto * ctfImage
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))