1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See for details of the copyright holders.
3// All rights not expressly granted are reserved.
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "Framework/Logger.h"
17#include "Framework/InputSpec.h"
22#include <fairmq/Device.h>
33#include "DataFormatsTPC/CTF.h"
34#include "DataFormatsTRD/CTF.h"
35#include "DataFormatsHMP/CTF.h"
36#include "DataFormatsFT0/CTF.h"
37#include "DataFormatsFV0/CTF.h"
38#include "DataFormatsFDD/CTF.h"
39#include "DataFormatsTOF/CTF.h"
40#include "DataFormatsMID/CTF.h"
41#include "DataFormatsMCH/CTF.h"
43#include "DataFormatsPHOS/CTF.h"
44#include "DataFormatsCPV/CTF.h"
45#include "DataFormatsZDC/CTF.h"
46#include "DataFormatsCTP/CTF.h"
48#include "rANS/histogram.h"
49#include "rANS/compat.h"
51#include <vector>
52#include <stdexcept>
53#include <array>
54#include <TStopwatch.h>
55#include <vector>
56#include <TFile.h>
57#include <TTree.h>
58#include <TRandom.h>
59#include <filesystem>
60#include <ctime>
61#include <sys/stat.h>
62#include <fcntl.h>
63#include <unistd.h>
64#include <regex>
65#include <numeric>
67using namespace o2::framework;
69namespace o2
71namespace ctf
74template <typename T>
75size_t appendToTree(TTree& tree, const std::string brname, T& ptr)
77 size_t s = 0;
78 auto* br = tree.GetBranch(brname.c_str());
79 auto* pptr = &ptr;
80 if (br) {
81 br->SetAddress(&pptr);
82 } else {
83 br = tree.Branch(brname.c_str(), &pptr);
84 }
85 int res = br->Fill();
86 if (res < 0) {
87 throw std::runtime_error(fmt::format("Failed to fill CTF branch {}", brname));
88 }
89 s += res;
90 br->ResetAddress();
91 return s;
99 public:
100 CTFWriterSpec() = delete;
101 CTFWriterSpec(DetID::mask_t dm, const std::string& outType, int verbosity, int reportInterval);
102 ~CTFWriterSpec() final { finalize(); }
103 void init(o2::framework::InitContext& ic) final;
105 void endOfStream(o2::framework::EndOfStreamContext& ec) final { finalize(); }
106 void stop() final { finalize(); }
107 bool isPresent(DetID id) const { return mDets[id]; }
109 private:
110 void updateTimeDependentParams(ProcessingContext& pc);
111 template <typename C>
112 size_t processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree);
113 template <typename C>
114 void storeDictionary(DetID det, CTFHeader& header);
115 void storeDictionaries();
116 void closeTFTreeAndFile();
117 void prepareTFTreeAndFile();
118 size_t estimateCTFSize(ProcessingContext& pc);
119 size_t getAvailableDiskSpace(const std::string& path, int level);
120 void createLockFile(int level);
121 void removeLockFile();
122 void finalize();
124 DetID::mask_t mDets; // detectors
125 bool mFinalized = false;
126 bool mWriteCTF = true;
127 bool mCreateDict = false;
128 bool mCreateRunEnvDir = true;
129 bool mStoreMetaFile = false;
130 bool mRejectCurrentTF = false;
131 bool mFallBackDirUsed = false;
132 bool mFallBackDirProvided = false;
133 int mReportInterval = -1;
134 int mVerbosity = 0;
135 int mSaveDictAfter = 0; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
136 uint32_t mPrevDictTimeStamp = 0; // timestamp of the previously stored dictionary
137 uint32_t mDictTimeStamp = 0; // timestamp of the currently stored dictionary
138 size_t mMinSize = 0; // if > 0, accumulate CTFs in the same tree until the total size exceeds this minimum
139 size_t mMaxSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinSize is not reached)
140 size_t mChkSize = 0; // if > 0 and fallback storage provided, reserve this size per CTF file in production on primary storage
141 size_t mAccCTFSize = 0; // so far accumulated size (if any)
142 size_t mCurrCTFSize = 0; // size of currently processed CTF
143 size_t mNCTF = 0; // total number of CTFs written
144 size_t mNCTFPrevDict = 0; // total number of CTFs used for previous dictionary version
145 size_t mNAccCTF = 0; // total number of CTFs accumulated in the current file
146 int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
147 int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
148 float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
149 long mCTFAutoSave = 0; // if > 0, autosave after so many TFs
150 size_t mNCTFFiles = 0; // total number of CTF files written
151 int mMaxCTFPerFile = 0; // max CTFs per files to store
152 int mRejRate = 0; // CTF rejection rule (>0: percentage to reject randomly, <0: reject if timeslice%|value|!=0)
153 int mCTFFileCompression = 0; // CTF file compression level (if >= 0)
154 bool mFillMD5 = false;
155 std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
156 o2::framework::DataTakingContext mDataTakingContext{};
157 o2::framework::TimingInfo mTimingInfo{};
158 std::string mOutputType{}; // RS FIXME once global/local options clash is solved, --output-type will become device option
159 std::string mDictDir{};
160 std::string mCTFDir{};
161 std::string mHostName{};
162 std::string mCTFDirFallBack = "/dev/null";
163 std::string mCTFMetaFileDir = "/dev/null";
164 std::string mCurrentCTFFileName{};
165 std::string mCurrentCTFFileNameFull{};
166 std::string mSizeReport{};
167 std::string mMetaDataType{};
168 const std::string LOCKFileDir = "/tmp/ctf-writer-locks";
169 std::string mLockFileName{};
170 int mLockFD = -1;
171 std::unique_ptr<TFile> mCTFFileOut;
172 std::unique_ptr<TTree> mCTFTreeOut;
174 std::unique_ptr<TFile> mDictFileOut; // file to store dictionary
175 std::unique_ptr<TTree> mDictTreeOut; // tree to store dictionary
177 // For the external dictionary creation we accumulate for each detector the frequency tables of its each block
178 // After accumulation over multiple TFs we store the dictionaries data in the standard CTF format of this detector,
179 // i.e. EncodedBlock stored in a tree, BUT with dictionary data only added to each block.
180 // The metadata of the block (min,max) will be used for the consistency check at the decoding
181 std::array<std::vector<FTrans>, DetID::nDetectors> mFreqsAccumulation;
182 std::array<std::vector<o2::ctf::Metadata>, DetID::nDetectors> mFreqsMetaData;
183 std::array<std::bitset<64>, DetID::nDetectors> mIsSaturatedFrequencyTable;
184 std::array<std::shared_ptr<void>, DetID::nDetectors> mHeaders;
185 TStopwatch mTimer;
187 static const std::string TMPFileEnding;
190const std::string CTFWriterSpec::TMPFileEnding{".part"};
193CTFWriterSpec::CTFWriterSpec(DetID::mask_t dm, const std::string& outType, int verbosity, int reportInterval)
194 : mDets(dm), mOutputType(outType), mReportInterval(reportInterval), mVerbosity(verbosity)
196 std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](auto& bitset) { bitset.reset(); });
197 mTimer.Stop();
198 mTimer.Reset();
204 // auto outmode = ic.options().get<std::string>("output-type"); // RS FIXME once global/local options clash is solved, --output-type will become device option
205 auto outmode = mOutputType;
206 if (outmode == "ctf") {
207 mWriteCTF = true;
208 mCreateDict = false;
209 } else if (outmode == "dict") {
210 mWriteCTF = false;
211 mCreateDict = true;
212 } else if (outmode == "both") {
213 mWriteCTF = true;
214 mCreateDict = true;
215 } else if (outmode == "none") {
216 mWriteCTF = false;
217 mCreateDict = false;
218 } else {
219 throw std::invalid_argument("Invalid output-type");
220 }
222 mSaveDictAfter = ic.options().get<int>("save-dict-after");
223 mCTFAutoSave = ic.options().get<long>("save-ctf-after");
224 mCTFFileCompression = ic.options().get<int>("ctf-file-compression");
225 mCTFMetaFileDir = ic.options().get<std::string>("meta-output-dir");
226 if (mCTFMetaFileDir != "/dev/null") {
227 mCTFMetaFileDir = o2::utils::Str::rectifyDirectory(mCTFMetaFileDir);
228 mStoreMetaFile = true;
229 mFillMD5 = ic.options().get<bool>("md5-for-meta");
230 }
231 mDictDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("ctf-dict-dir"));
232 mCTFDir = ic.options().get<std::string>("output-dir");
233 if (mCTFDir != "/dev/null") {
234 mCTFDir = o2::utils::Str::rectifyDirectory(mCTFDir);
235 } else {
236 mWriteCTF = false;
237 mStoreMetaFile = false;
238 }
239 mCTFDirFallBack = ic.options().get<std::string>("output-dir-alt");
240 if (mCTFDirFallBack != "/dev/null") {
241 mCTFDirFallBack = o2::utils::Str::rectifyDirectory(mCTFDirFallBack);
242 mFallBackDirProvided = true;
243 }
244 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
245 mMinSize = ic.options().get<int64_t>("min-file-size");
246 mMaxSize = ic.options().get<int64_t>("max-file-size");
247 mMaxCTFPerFile = ic.options().get<int>("max-ctf-per-file");
248 mRejRate = ic.options().get<int>("ctf-rejection");
249 if (mRejRate > 0) {
250 LOGP(info, "Will reject{} {}% of TFs", mRejRate < 100 ? " randomly" : "", mRejRate < 100 ? mRejRate : 100);
251 } else if (mRejRate < -1) {
252 LOGP(info, "Will reject all but each {}-th TF slice", -mRejRate);
253 }
255 if (mWriteCTF) {
256 if (mMinSize > 0) {
257 LOG(info) << "Multiple CTFs will be accumulated in the tree/file until its size exceeds " << mMinSize << " bytes";
258 if (mMaxSize > mMinSize) {
259 LOG(info) << "but does not exceed " << mMaxSize << " bytes";
260 }
261 }
262 }
264 mCheckDiskFull = ic.options().get<float>("require-free-disk");
265 mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
266 mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
268 mChkSize = std::max(size_t(mMinSize * 1.1), mMaxSize);
271 if (mCreateDict) { // make sure that there is no local dictonary
272 std::string dictFileName = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
273 if (std::filesystem::exists(dictFileName)) {
274 throw std::runtime_error(o2::utils::Str::concat_string("CTF dictionary creation is requested but ", dictFileName, " already exists, remove it!"));
275 }
277 }
279 char hostname[_POSIX_HOST_NAME_MAX];
280 gethostname(hostname, _POSIX_HOST_NAME_MAX);
281 mHostName = hostname;
282 mHostName = mHostName.substr(0, mHostName.find('.'));
286void CTFWriterSpec::updateTimeDependentParams(ProcessingContext& pc)
288 namespace GRPECS = o2::parameters::GRPECS;
289 mTimingInfo =<o2::framework::TimingInfo>();
290 if (mTimingInfo.globalRunNumberChanged) {
291 mDataTakingContext =<DataTakingContext>();
292 // determine the output type for the CTF metadata
293 mMetaDataType = GRPECS::getRawDataPersistencyMode(mDataTakingContext.runType, mDataTakingContext.forcedRaw);
294 }
298// process data of particular detector
299template <typename C>
300size_t CTFWriterSpec::processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree)
302 static bool warnedEmpty = false;
303 size_t sz = 0;
304 if (!isPresent(det) || !pc.inputs().isValid(det.getName())) {
305 mSizeReport += fmt::format(" {}:N/A", det.getName());
306 return sz;
307 }
308 auto ctfBuffer = pc.inputs().get<gsl::span<o2::ctf::BufferType>>(det.getName());
309 const o2::ctf::BufferType* bdata =;
310 if (bdata) {
311 if (warnedEmpty) {
312 throw std::runtime_error(fmt::format("Non-empty input was seen at {}-th TF after empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
313 }
314 const auto ctfImage = C::getImage(bdata);
315 ctfImage.print(o2::utils::Str::concat_string(det.getName(), ": "), mVerbosity);
316 if (mWriteCTF && !mRejectCurrentTF) {
317 sz = ctfImage.appendToTree(*tree, det.getName());
318 header.detectors.set(det);
319 } else {
320 sz = ctfBuffer.size();
321 }
322 if (mCreateDict) {
323 if (mFreqsAccumulation[det].empty()) {
324 mFreqsAccumulation[det].resize(C::getNBlocks());
325 mFreqsMetaData[det].resize(C::getNBlocks());
326 }
327 if (!mHeaders[det]) { // store 1st header
328 mHeaders[det] = ctfImage.cloneHeader();
329 auto& hb = *static_cast<o2::ctf::CTFDictHeader*>(mHeaders[det].get());
330 hb.det = det;
331 }
332 for (int ib = 0; ib < C::getNBlocks(); ib++) {
333 if (!mIsSaturatedFrequencyTable[det][ib]) {
334 const auto& bl = ctfImage.getBlock(ib);
335 if (bl.getNDict()) {
336 auto freq = mFreqsAccumulation[det][ib];
337 auto& mdSave = mFreqsMetaData[det][ib];
338 const auto& md = ctfImage.getMetadata(ib);
339 if ([&, this]() {
340 try {
341 freq.addFrequencies(bl.getDict(), bl.getDict() + bl.getNDict(), md.min);
342 } catch (const std::overflow_error& e) {
343 LOGP(warning, "unable to add frequency table for {}, block {} due to overflow", det.getName(), ib);
344 mIsSaturatedFrequencyTable[det][ib] = true;
345 return false;
346 }
347 return true;
348 }()) {
350 auto histogramView = o2::rans::trim(o2::rans::makeHistogramView(freq));
351 mdSave = ctf::detail::makeMetadataRansDict(newProbBits,
352 static_cast<int32_t>(histogramView.getMin()),
353 static_cast<int32_t>(histogramView.getMax()),
354 static_cast<int32_t>(histogramView.size()),
355 md.opt);
356 mFreqsAccumulation[det][ib] = std::move(freq);
357 }
358 }
359 }
360 }
361 }
362 } else {
363 if (!warnedEmpty) {
364 if (mNCTF) {
365 throw std::runtime_error(fmt::format("Empty input was seen at {}-th TF after non-empty one for {}, this will lead to misalignment of detectors in CTF", mNCTF, det.getName()));
366 }
367 LOGP(important, "Empty CTF provided for {}, skipping and will not report anymore", det.getName());
368 warnedEmpty = true;
369 }
370 }
371 mSizeReport += fmt::format(" {}:{}", det.getName(), fmt::group_digits(sz));
372 return sz;
376// store dictionary of a particular detector
377template <typename C>
378void CTFWriterSpec::storeDictionary(DetID det, CTFHeader& header)
380 // create vector whose data contains dictionary in CTF format (EncodedBlock)
381 if (!isPresent(det) || !mFreqsAccumulation[det].size()) {
382 return;
383 }
384 auto dictBlocks = C::createDictionaryBlocks(mFreqsAccumulation[det], mFreqsMetaData[det]);
385 auto& h = C::get(>getHeader();
386 h = *reinterpret_cast<typename std::remove_reference<decltype(h)>::type*>(mHeaders[det].get());
387 auto& hb = static_cast<o2::ctf::CTFDictHeader&>(h);
388 hb = *static_cast<const o2::ctf::CTFDictHeader*>(mHeaders[det].get());
389 hb.dictTimeStamp = mDictTimeStamp;
391 auto getFileName = [this, det, &hb](bool curr) {
392 return fmt::format("{}{}_{}_v{}.{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, det.getName(), int(hb.majorVersion), int(hb.minorVersion),
393 curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
394 };
396 C::get(>print(o2::utils::Str::concat_string("Storing dictionary for ", det.getName(), ": "));
397 auto outName = getFileName(true);
398 TFile flout(outName.c_str(), "recreate");
399 flout.WriteObject(&dictBlocks,;
400 flout.WriteObject(&hb, fmt::format("ctf_dict_header_{}", det.getName()).c_str());
401 flout.Close();
402 LOGP(info, "Saved {} with {} TFs to {}", hb.asString(), mNCTF, outName);
403 if (mPrevDictTimeStamp) {
404 auto outNamePrev = getFileName(false);
405 if (std::filesystem::exists(outNamePrev)) {
406 std::filesystem::remove(outNamePrev);
407 LOGP(info, "Removed previous dictionary version {}", outNamePrev);
408 }
409 }
410 C::get(>appendToTree(*mDictTreeOut.get(), det.getName()); // cast to EncodedBlock and attach to dictionaries tree
411 header.detectors.set(det);
415size_t CTFWriterSpec::estimateCTFSize(ProcessingContext& pc)
417 size_t s = 0;
418 for (auto id = DetID::First; id <= DetID::Last; id++) {
419 DetID det(id);
420 if (!isPresent(det) || !pc.inputs().isValid(det.getName())) {
421 continue;
422 }
423 s += pc.inputs().get<gsl::span<o2::ctf::BufferType>>(det.getName()).size();
424 }
425 return s;
431 const std::string NAStr = "NA";
433 mTimer.Reset();
434 }
435 auto cput = mTimer.CpuTime();
436 mTimer.Start(false);
437 updateTimeDependentParams(pc);
438 mRejectCurrentTF = (mRejRate > 0 && int(gRandom->Rndm() * 100) < mRejRate) || (mRejRate < -1 && mTimingInfo.timeslice % (-mRejRate));
439 mCurrCTFSize = estimateCTFSize(pc);
440 if (mWriteCTF && !mRejectCurrentTF) {
441 prepareTFTreeAndFile();
443 int totalWait = 0, nwaitCycles = 0;
444 while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) { // we are on the physical disk and not on the RAM disk
445 constexpr size_t MB = 1024 * 1024;
446 constexpr int showFirstN = 10, prsecaleWarnings = 50;
447 try {
448 const auto si = std::filesystem::space(mCTFFileOut->GetName());
449 std::string wmsg{};
450 if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
451 nwaitCycles++;
452 wmsg = fmt::format("Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB, size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
453 } else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
454 nwaitCycles++;
455 wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
456 } else {
457 nwaitCycles = 0;
458 }
459 if (nwaitCycles) {
460 if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
461 closeTFTreeAndFile(); // try to save whatever we have
462 LOGP(fatal, "Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
463 }
464 if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
465 LOG(alarm) << wmsg;
466 }
467<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
468 totalWait += mWaitDiskFull;
469 continue;
470 }
471 } catch (std::exception const& e) {
472 LOG(fatal) << "unable to query disk space info for path " << mCurrentCTFFileNameFull << ", reason: " << e.what();
473 }
474 break;
475 }
476 }
477 // create header
478 CTFHeader header{mTimingInfo.runNumber, mTimingInfo.creation, mTimingInfo.firstTForbit, mTimingInfo.tfCounter};
479 size_t szCTF = 0;
480 mSizeReport = "";
481 std::array<size_t, DetID::CTP + 1> szCTFperDet{0}; // DetID::TST is between FDD and CTP and remains empty
482 szCTFperDet[DetID::ITS] = processDet<o2::itsmft::CTF>(pc, DetID::ITS, header, mCTFTreeOut.get());
483 szCTFperDet[DetID::TPC] = processDet<o2::tpc::CTF>(pc, DetID::TPC, header, mCTFTreeOut.get());
484 szCTFperDet[DetID::TRD] = processDet<o2::trd::CTF>(pc, DetID::TRD, header, mCTFTreeOut.get());
485 szCTFperDet[DetID::TOF] = processDet<o2::tof::CTF>(pc, DetID::TOF, header, mCTFTreeOut.get());
486 szCTFperDet[DetID::PHS] = processDet<o2::phos::CTF>(pc, DetID::PHS, header, mCTFTreeOut.get());
487 szCTFperDet[DetID::CPV] = processDet<o2::cpv::CTF>(pc, DetID::CPV, header, mCTFTreeOut.get());
488 szCTFperDet[DetID::EMC] = processDet<o2::emcal::CTF>(pc, DetID::EMC, header, mCTFTreeOut.get());
489 szCTFperDet[DetID::HMP] = processDet<o2::hmpid::CTF>(pc, DetID::HMP, header, mCTFTreeOut.get());
490 szCTFperDet[DetID::MFT] = processDet<o2::itsmft::CTF>(pc, DetID::MFT, header, mCTFTreeOut.get());
491 szCTFperDet[DetID::MCH] = processDet<o2::mch::CTF>(pc, DetID::MCH, header, mCTFTreeOut.get());
492 szCTFperDet[DetID::MID] = processDet<o2::mid::CTF>(pc, DetID::MID, header, mCTFTreeOut.get());
493 szCTFperDet[DetID::ZDC] = processDet<o2::zdc::CTF>(pc, DetID::ZDC, header, mCTFTreeOut.get());
494 szCTFperDet[DetID::FT0] = processDet<o2::ft0::CTF>(pc, DetID::FT0, header, mCTFTreeOut.get());
495 szCTFperDet[DetID::FV0] = processDet<o2::fv0::CTF>(pc, DetID::FV0, header, mCTFTreeOut.get());
496 szCTFperDet[DetID::FDD] = processDet<o2::fdd::CTF>(pc, DetID::FDD, header, mCTFTreeOut.get());
497 szCTFperDet[DetID::CTP] = processDet<o2::ctp::CTF>(pc, DetID::CTP, header, mCTFTreeOut.get());
498 szCTF = std::accumulate(szCTFperDet.begin(), szCTFperDet.end(), 0);
499 if (mReportInterval > 0 && (mTimingInfo.tfCounter % mReportInterval) == 0) {
500 LOGP(important, "CTF {} size report:{} - Total:{}", mTimingInfo.tfCounter, mSizeReport, fmt::group_digits(szCTF));
501 }
503 mTimer.Stop();
505 if (mWriteCTF && !mRejectCurrentTF) {
506 szCTF += appendToTree(*mCTFTreeOut.get(), "CTFHeader", header);
507 size_t prevSizeMB = mAccCTFSize / (1 << 20);
508 mAccCTFSize += szCTF;
509 mCTFTreeOut->SetEntries(++mNAccCTF);
510 mTFOrbits.push_back(mTimingInfo.firstTForbit);
511 LOG(info) << "TF#" << mNCTF << ": wrote CTF{" << header << "} of size " << szCTF << " to " << mCurrentCTFFileNameFull << " in " << mTimer.CpuTime() - cput << " s";
512 if (mNAccCTF > 1) {
513 LOG(info) << "Current CTF tree has " << mNAccCTF << " entries with total size of " << mAccCTFSize << " bytes";
514 }
515 if (mLockFD != -1) {
516 lseek(mLockFD, 0, SEEK_SET);
517 auto nwr = write(mLockFD, &mAccCTFSize, sizeof(size_t));
518 if (nwr != sizeof(size_t)) {
519 LOG(error) << "Failed to write current CTF size " << mAccCTFSize << " to lock file, bytes written: " << nwr;
520 }
521 }
523 if (mAccCTFSize >= mMinSize || (mMaxCTFPerFile > 0 && mNAccCTF >= mMaxCTFPerFile)) {
524 closeTFTreeAndFile();
525 } else if ((mCTFAutoSave > 0 && mNAccCTF % mCTFAutoSave == 0) || (mCTFAutoSave < 0 && int(prevSizeMB / (-mCTFAutoSave)) != size_t(mAccCTFSize / (1 << 20)) / (-mCTFAutoSave))) {
526 mCTFTreeOut->AutoSave("override");
527 }
528 } else {
529 LOG(info) << "TF#" << mNCTF << " {" << header << "} CTF writing is disabled, size was " << szCTF << " bytes";
530 }
532 mNCTF++;
533 if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
534 storeDictionaries();
535 }
536 int dummy = 0;
537 pc.outputs().snapshot({"ctfdone", 0}, dummy);
538 pc.outputs().snapshot(Output{"CTF", "SIZES", 0}, szCTFperDet);
542void CTFWriterSpec::finalize()
544 if (mFinalized) {
545 return;
546 }
547 if (mCreateDict) {
548 storeDictionaries();
549 }
550 if (mWriteCTF) {
551 closeTFTreeAndFile();
552 }
553 LOGF(info, "CTF writing total timing: Cpu: %.3e Real: %.3e s in %d slots",
554 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
555 mFinalized = true;
556 mNCTF = 0;
557 mNCTFFiles = 0;
561void CTFWriterSpec::prepareTFTreeAndFile()
563 if (!mWriteCTF) {
564 return;
565 }
566 bool needToOpen = false;
567 if (!mCTFTreeOut) {
568 needToOpen = true;
569 } else {
570 if ((mAccCTFSize >= mMinSize) || // min size exceeded, may close the file.
571 (mAccCTFSize && mMaxSize > mMinSize && ((mAccCTFSize + mCurrCTFSize) > mMaxSize))) { // this is not the 1st CTF in the file and the new size will exceed allowed max
572 needToOpen = true;
573 } else {
574 LOGP(info, "Will add new CTF of estimated size {} to existing file of size {}", mCurrCTFSize, mAccCTFSize);
575 }
576 }
577 if (needToOpen) {
578 closeTFTreeAndFile();
579 mFallBackDirUsed = false;
580 auto ctfDir = mCTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mCTFDir;
581 if (mChkSize > 0 && mFallBackDirProvided) {
582 createLockFile(0);
583 auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
584 if (sz < mChkSize) {
585 removeLockFile();
586 LOG(warning) << "Primary CTF output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
587 ctfDir = mCTFDirFallBack;
588 mFallBackDirUsed = true;
589 }
590 }
591 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
592 ctfDir += fmt::format("{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
593 if (!ctfDir.empty()) {
595 LOGP(info, "Created {} directory for CTFs output", ctfDir);
596 }
597 }
598 mCurrentCTFFileName = o2::base::NameConf::getCTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter, mHostName);
599 mCurrentCTFFileNameFull = fmt::format("{}{}", ctfDir, mCurrentCTFFileName);
600 mCTFFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
601 if (mCTFFileCompression >= 0) {
602 mCTFFileOut->SetCompressionLevel(mCTFFileCompression);
603 }
604 mCTFTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFTREENAME).c_str(), "O2 CTF tree");
606 mNCTFFiles++;
607 }
611void CTFWriterSpec::closeTFTreeAndFile()
613 if (mCTFTreeOut) {
614 try {
615 mCTFFileOut->cd();
616 mCTFTreeOut->Write();
617 mCTFTreeOut.reset();
618 mCTFFileOut->Close();
619 mCTFFileOut.reset();
620 // write CTF file metaFile data
621 auto actualFileName = TMPFileEnding.empty() ? mCurrentCTFFileNameFull : o2::utils::Str::concat_string(mCurrentCTFFileNameFull, TMPFileEnding);
622 if (mStoreMetaFile) {
624 if (!ctfMetaData.fillFileData(actualFileName, mFillMD5, TMPFileEnding)) {
625 throw std::runtime_error("metadata file was requested but not created");
626 }
627 ctfMetaData.setDataTakingContext(mDataTakingContext);
628 ctfMetaData.type = mMetaDataType;
629 ctfMetaData.priority = mFallBackDirUsed ? "low" : "high";
630 ctfMetaData.tfOrbits.swap(mTFOrbits);
631 auto metaFileNameTmp = fmt::format("{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
632 auto metaFileName = fmt::format("{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
633 try {
634 std::ofstream metaFileOut(metaFileNameTmp);
635 metaFileOut << ctfMetaData;
636 metaFileOut.close();
637 if (!TMPFileEnding.empty()) {
638 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
639 }
640 std::filesystem::rename(metaFileNameTmp, metaFileName);
641 } catch (std::exception const& e) {
642 LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
643 }
644 } else if (!TMPFileEnding.empty()) {
645 std::filesystem::rename(actualFileName, mCurrentCTFFileNameFull);
646 }
647 } catch (std::exception const& e) {
648 LOG(error) << "Failed to finalize CTF file " << mCurrentCTFFileNameFull << ", reason: " << e.what();
649 }
650 mTFOrbits.clear();
651 mNAccCTF = 0;
652 mAccCTFSize = 0;
653 removeLockFile();
654 }
658void CTFWriterSpec::storeDictionaries()
660 // monolitic dictionary in tree format
661 mDictTimeStamp = uint32_t(std::time(nullptr));
662 auto getFileName = [this](bool curr) {
663 return fmt::format("{}{}Tree_{}_{}_{}.root", this->mDictDir, o2::base::NameConf::CTFDICT, DetID::getNames(this->mDets, '-'), curr ? this->mDictTimeStamp : this->mPrevDictTimeStamp, curr ? this->mNCTF : this->mNCTFPrevDict);
664 };
665 auto dictFileName = getFileName(true);
666 mDictFileOut.reset(TFile::Open(dictFileName.c_str(), "recreate"));
667 mDictTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFDICT).c_str(), "O2 CTF dictionary");
669 CTFHeader header{mTimingInfo.runNumber, uint32_t(mNCTF)};
670 storeDictionary<o2::itsmft::CTF>(DetID::ITS, header);
671 storeDictionary<o2::itsmft::CTF>(DetID::MFT, header);
672 storeDictionary<o2::tpc::CTF>(DetID::TPC, header);
673 storeDictionary<o2::trd::CTF>(DetID::TRD, header);
674 storeDictionary<o2::tof::CTF>(DetID::TOF, header);
675 storeDictionary<o2::ft0::CTF>(DetID::FT0, header);
676 storeDictionary<o2::fv0::CTF>(DetID::FV0, header);
677 storeDictionary<o2::fdd::CTF>(DetID::FDD, header);
678 storeDictionary<o2::mid::CTF>(DetID::MID, header);
679 storeDictionary<o2::mch::CTF>(DetID::MCH, header);
680 storeDictionary<o2::emcal::CTF>(DetID::EMC, header);
681 storeDictionary<o2::phos::CTF>(DetID::PHS, header);
682 storeDictionary<o2::cpv::CTF>(DetID::CPV, header);
683 storeDictionary<o2::zdc::CTF>(DetID::ZDC, header);
684 storeDictionary<o2::hmpid::CTF>(DetID::HMP, header);
685 storeDictionary<o2::ctp::CTF>(DetID::CTP, header);
686 mDictFileOut->cd();
687 appendToTree(*mDictTreeOut.get(), "CTFHeader", header);
688 mDictTreeOut->SetEntries(1);
689 mDictTreeOut->Write(mDictTreeOut->GetName(), TObject::kSingleKey);
690 mDictTreeOut.reset();
691 mDictFileOut.reset();
692 std::string dictFileNameLnk = fmt::format("{}{}.root", mDictDir, o2::base::NameConf::CTFDICT);
693 if (std::filesystem::exists(dictFileNameLnk)) {
694 std::filesystem::remove(dictFileNameLnk);
695 }
696 std::filesystem::create_symlink(dictFileName, dictFileNameLnk);
697 LOGP(info, "Saved CTF dictionaries tree with {} TFs to {} and linked to {}", mNCTF, dictFileName, dictFileNameLnk);
698 if (mPrevDictTimeStamp) {
699 auto dictFileNamePrev = getFileName(false);
700 if (std::filesystem::exists(dictFileNamePrev)) {
701 std::filesystem::remove(dictFileNamePrev);
702 LOGP(info, "Removed previous dictionary version {}", dictFileNamePrev);
703 }
704 }
705 mNCTFPrevDict = mNCTF;
706 mPrevDictTimeStamp = mDictTimeStamp;
710void CTFWriterSpec::createLockFile(int level)
712 // create lock file for the CTF to be written to the storage of given level
713 while (1) {
714 mLockFileName = fmt::format("{}/ctfs{}-{}_{}_{}_{}.lock", LOCKFileDir, level, o2::utils::Str::getRandomString(8), mTimingInfo.runNumber, mTimingInfo.firstTForbit, mTimingInfo.tfCounter);
715 if (!std::filesystem::exists(mLockFileName)) {
716 break;
717 }
718 }
719 mLockFD = open(mLockFileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
720 if (mLockFD == -1) {
721 throw std::runtime_error(fmt::format("Error opening lock file {}", mLockFileName));
722 }
723 if (lockf(mLockFD, F_LOCK, 0)) {
724 throw std::runtime_error(fmt::format("Error locking file {}", mLockFileName));
725 }
729void CTFWriterSpec::removeLockFile()
731 // remove CTF lock file
732 if (mLockFD != -1) {
733 if (lockf(mLockFD, F_ULOCK, 0)) {
734 throw std::runtime_error(fmt::format("Error unlocking file {}", mLockFileName));
735 }
736 mLockFD = -1;
737 std::error_code ec;
738 std::filesystem::remove(mLockFileName, ec); // use non-throwing version
739 }
743size_t CTFWriterSpec::getAvailableDiskSpace(const std::string& path, int level)
745 // count number of CTF files in processing (written to storage at given level) from their lock files
746 std::regex pat{fmt::format("({}/ctfs{}-[[:alnum:]_]+\\.lock$)", LOCKFileDir, level)};
747 int nLocked = 0;
748 size_t written = 0;
749 std::error_code ec;
750 for (const auto& entry : std::filesystem::directory_iterator(LOCKFileDir)) {
751 const auto& entryName = entry.path().native();
752 if (std::regex_search(entryName, pat) && (mLockFD < 0 || entryName != mLockFileName)) {
753 int fdt = open(entryName.c_str(), O_RDONLY);
754 if (fdt != -1) {
755 bool locked = lockf(fdt, F_TEST, 0) != 0;
756 if (locked) {
757 nLocked++;
758 size_t sz = 0;
759 auto nrd = read(fdt, &sz, sizeof(size_t));
760 if (nrd == sizeof(size_t)) {
761 written += sz;
762 }
763 }
764 close(fdt);
765 // unlocked file is either leftover from crached job or a file from concurent job which was being locked
766 // or just unlocked but not yet removed. In the former case remove it
767 if (!locked) {
768 struct stat statbuf;
769 if (stat(entryName.c_str(), &statbuf) != -1) { // if we fail to stat, the file was already removed
770#ifdef __APPLE__
771 auto ftime = statbuf.st_mtimespec.tv_sec; // last write time
773 auto ftime = statbuf.st_mtim.tv_sec; // last write time
775 auto ctime = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
776 if (ftime + 60 < ctime) { // this is an old file, remove it
777 std::filesystem::remove(entryName, ec); // use non-throwing version
778 }
779 }
780 }
781 }
782 }
783 }
784 const auto si = std::filesystem::space(path, ec);
785 int64_t avail = int64_t(si.available) - nLocked * mChkSize + written; // account already written part of unfinished files
786 LOGP(debug, "{} CTF files open (curr.size: {}) -> can use {} of {} bytes", nLocked, written, avail, si.available);
787 return avail > 0 ? avail : 0;
791DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, const std::string& outType, int verbosity, int reportInterval)
793 std::vector<InputSpec> inputs;
794 LOG(debug) << "Detectors list:";
795 for (auto id = DetID::First; id <= DetID::Last; id++) {
796 if (dets[id]) {
797 inputs.emplace_back(DetID::getName(id), DetID::getDataOrigin(id), "CTFDATA", 0, Lifetime::Timeframe);
798 LOG(debug) << "Det " << DetID::getName(id) << " added";
799 }
800 }
801 return DataProcessorSpec{
802 "ctf-writer",
803 inputs,
804 Outputs{{OutputLabel{"ctfdone"}, "CTF", "DONE", 0, Lifetime::Timeframe},
805 {"CTF", "SIZES", 0, Lifetime::Timeframe}},
806 AlgorithmSpec{adaptFromTask<CTFWriterSpec>(dets, outType, verbosity, reportInterval)}, // RS FIXME once global/local options clash is solved, --output-type will become device option
807 Options{ //{"output-type", VariantType::String, "ctf", {"output types: ctf (per TF) or dict (create dictionaries) or both or none"}},
808 {"save-ctf-after", VariantType::Int64, 0ll, {"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},
809 {"save-dict-after", VariantType::Int, 0, {"if > 0, in dictionary generation mode save it dictionary after certain number of TFs processed"}},
810 {"ctf-dict-dir", VariantType::String, "none", {"CTF dictionary directory, must exist"}},
811 {"output-dir", VariantType::String, "none", {"CTF output directory, must exist"}},
812 {"output-dir-alt", VariantType::String, "/dev/null", {"Alternative CTF output directory, must exist (if not /dev/null)"}},
813 {"meta-output-dir", VariantType::String, "/dev/null", {"CTF metadata output directory, must exist (if not /dev/null)"}},
814 {"md5-for-meta", VariantType::Bool, false, {"fill CTF file MD5 sum in the metadata file"}},
815 {"min-file-size", VariantType::Int64, 0l, {"accumulate CTFs until given file size reached"}},
816 {"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
817 {"max-ctf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
818 {"ctf-rejection", VariantType::Int, 0, {">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
819 {"ctf-file-compression", VariantType::Int, 0, {"if >= 0: impose CTF file compression level"}},
820 {"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
821 {"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
822 {"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
823 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
826} // namespace ctf
827} // namespace o2
