Project
Loading...
Searching...
No Matches
FileWriterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15//
16#include <bitset>
17#include <filesystem>
18#include <memory>
19#include <vector>
20#include <string>
21#include "fmt/format.h"
22
23#include "TTree.h"
24
25#include <fairmq/Device.h>
26#include "Framework/Task.h"
30#include "Framework/Logger.h"
35
36#include "Headers/DataHeader.h"
40
46#include "TPCBase/Sector.h"
47
48using namespace o2::framework;
52
53namespace o2::tpc
54{
55
56template <typename T>
57class FileWriterDevice : public Task
58{
59 public:
60 FileWriterDevice(const BranchType branchType, unsigned long sectorMask)
61 {
62 mBranchType = branchType;
63 mSectorMask = sectorMask;
64 }
65
66 void init(InitContext& ic) final
67 {
68 mOutDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("output-dir"));
69
70 mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
71 mMaxTFPerFile = ic.options().get<int>("max-tf-per-file");
72 mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
73 if (mMetaFileDir != "/dev/null") {
74 mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
75 mStoreMetaFile = true;
76 }
77 }
78
79 void fillData(bool finalFill = false)
80 {
81 for (auto it = mCollectedData.begin(); it != mCollectedData.end();) {
82 auto& dataStore = it->second;
83 if (!finalFill && (dataStore.received.to_ulong() != mSectorMask)) {
84 ++it;
85 continue;
86 }
87 const auto oldTF = mPresentTF;
88 mPresentTF = it->first;
89 const auto oldTForbit = mFirstTForbit;
90 mFirstTForbit = dataStore.firstOrbit;
91 mTFOrbits.emplace_back(mFirstTForbit);
92
93 LOGP(info, "Filling tree entry {} for run {}, tf {}, orbit {}, final {}, sectors {}, (expected: {})", mNTFs, mRun, mPresentTF, mFirstTForbit, finalFill, dataStore.received.to_string(), std::bitset<Sector::MAXSECTOR>(mSectorMask).to_string());
94 auto& data = dataStore.data;
95 std::array<std::vector<T>*, Sector::MAXSECTOR> dataPtr;
96 for (size_t sector = 0; sector < data.size(); ++sector) {
97 auto& inData = data[sector];
98 dataPtr[sector] = &inData;
99
100 if (!mDataBranches[sector]) {
101 mDataBranches[sector] = mTreeOut->Branch(fmt::format("{}_{}", BranchName.at(mBranchType), sector).data(), &inData);
102 } else {
103 mDataBranches[sector]->SetAddress(&dataPtr[sector]);
104 }
105 }
106
107 mTreeOut->Fill();
108 for (size_t sector = 0; sector < data.size(); ++sector) {
109 mDataBranches[sector]->ResetAddress();
110 }
111
112 mPresentTF = oldTF;
113 mFirstTForbit = oldTForbit;
114 ++mNTFs;
115
116 it = mCollectedData.erase(it);
117 }
118 }
119
120 void run(ProcessingContext& pc) final
121 {
122 static bool initOnceDone = false;
123 if (!initOnceDone) {
124 initOnceDone = true;
125 mDataTakingContext = pc.services().get<DataTakingContext>();
126 }
127
128 const std::string NAStr = "NA";
129 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
130 auto oldRun = mRun;
131 auto oldTF = mPresentTF;
133 mPresentTF = tinfo.tfCounter;
134 mFirstTForbit = tinfo.firstTForbit;
135
136 if (mWrite) {
137 if (!mCollectedData.size() || mCollectedData.find(mPresentTF) == mCollectedData.end()) {
138 prepareTreeAndFile(tinfo);
139 }
140
141 fillData();
142 }
143
144 for (auto const& inputRef : InputRecordWalker(pc.inputs())) {
145 auto& dataStore = mCollectedData[mPresentTF];
146 auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(inputRef);
147 if (sectorHeader == nullptr) {
148 LOGP(error, "sector header missing on header stack for input on ", inputRef.spec->binding);
149 continue;
150 }
151
152 const int sector = sectorHeader->sector();
153 // check if data was already received. Should never happen!
154 if (dataStore.received.test(sector)) {
155 LOGP(fatal, "data for sector {} and TF {} already received", sector, mPresentTF);
156 }
157 auto data = pc.inputs().get<std::vector<T>>(inputRef);
158 // LOGP(info, "Received data for sector {} with {} entries in TF {}, orbit {}", sector, data.size(), mPresentTF, mFirstTForbit);
159 dataStore.data[sector] = data;
160 dataStore.received.set(sector);
161 dataStore.firstOrbit = mFirstTForbit;
162 }
163 }
164
166 {
167 fillData(true);
168 closeTreeAndFile();
169 }
170
171 void stop() final
172 {
173 fillData(true);
174 closeTreeAndFile();
175 }
176
177 private:
178 struct DataStore {
179 uint32_t firstOrbit{};
180 std::array<std::vector<T>, Sector::MAXSECTOR> data;
181 std::bitset<Sector::MAXSECTOR> received;
182 };
183 std::map<uint32_t, DataStore> mCollectedData;
184 std::array<TBranch*, Sector::MAXSECTOR> mDataBranches{};
185 std::vector<TBranch*> mInfoBranches;
186 std::vector<uint32_t> mTFOrbits{};
187 std::unique_ptr<TFile> mFileOut;
188 std::unique_ptr<TTree> mTreeOut;
189 std::unique_ptr<FileMetaData> mFileMetaData;
190 std::string mOutDir{};
191 std::string mMetaFileDir{"/dev/null"};
192 std::string mCurrentFileName{};
193 std::string mCurrentFileNameFull{};
194 uint64_t mRun = 0;
195 uint32_t mPresentTF = 0;
196 uint32_t mFirstTForbit = 0;
197 unsigned long mSectorMask = 0xFFFFFFFFF;
198 size_t mNTFs = 0;
199 size_t mNFiles = 0;
200 int mMaxTFPerFile = 0;
201 bool mStoreMetaFile = false;
202 bool mWrite = true;
203 bool mCreateRunEnvDir = true;
204 BranchType mBranchType;
205 o2::framework::DataTakingContext mDataTakingContext{};
206 static constexpr std::string_view TMPFileEnding{".part"};
207
208 void prepareTreeAndFile(const o2::framework::TimingInfo& tinfo);
209 void closeTreeAndFile();
210};
211//___________________________________________________________________
212template <typename T>
213void FileWriterDevice<T>::prepareTreeAndFile(const o2::framework::TimingInfo& tinfo)
214{
215 if (!mWrite) {
216 return;
217 }
218 bool needToOpen = false;
219 if (!mTreeOut) {
220 needToOpen = true;
221 } else {
222 if ((mMaxTFPerFile > 0) && (mNTFs >= mMaxTFPerFile)) {
223 needToOpen = true;
224 }
225 }
226 if (needToOpen) {
227 LOGP(info, "opening new file");
228 closeTreeAndFile();
229 // auto fname = o2::base::NameConf::getCTFFileName(mRun, tinfo.firstTForbit, tinfo.tfCounter, "tpc_krypton");
230 auto ctfDir = mOutDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mOutDir;
231 // if (mChkSize > 0 && (mDirFallBack != "/dev/null")) {
232 // createLockFile(dh, 0);
233 // auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
234 // if (sz < mChkSize) {
235 // removeLockFile();
236 // LOG(warning) << "Primary output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
237 // ctfDir = mDirFallBack;
238 // }
239 // }
240 if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
241 ctfDir += fmt::format("{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
242 if (!std::filesystem::exists(ctfDir)) {
243 if (!std::filesystem::create_directories(ctfDir)) {
244 throw std::runtime_error(fmt::format("Failed to create {} directory", ctfDir));
245 } else {
246 LOG(info) << "Created {} directory for s output" << ctfDir;
247 }
248 }
249 }
250 mCurrentFileName = o2::base::NameConf::getCTFFileName(mRun, tinfo.firstTForbit, tinfo.tfCounter, "tpc_krypton");
251 mCurrentFileNameFull = fmt::format("{}{}", ctfDir, mCurrentFileName);
252 mFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
253 mTreeOut = std::make_unique<TTree>(TreeName.at(mBranchType).data(), "O2 tree");
254 mInfoBranches.emplace_back(mTreeOut->Branch("run", &mRun));
255 mInfoBranches.emplace_back(mTreeOut->Branch("tfCounter", &mPresentTF));
256 mInfoBranches.emplace_back(mTreeOut->Branch("firstOrbit", &mFirstTForbit));
257 LOGP(info, "created {} info branches", mInfoBranches.size());
258 if (mStoreMetaFile) {
259 mFileMetaData = std::make_unique<o2::dataformats::FileMetaData>();
260 }
261
262 mNFiles++;
263 }
264}
265//___________________________________________________________________
266template <typename T>
267void FileWriterDevice<T>::closeTreeAndFile()
268{
269 if (!mTreeOut) {
270 return;
271 }
272
273 LOGP(info, "closing file {}", mCurrentFileName);
274 try {
275 mFileOut->cd();
276 mTreeOut->SetEntries();
277 mTreeOut->Write();
278 mTreeOut.reset();
279 mFileOut->Close();
280 mFileOut.reset();
281 if (!TMPFileEnding.empty()) {
282 std::filesystem::rename(o2::utils::Str::concat_string(mCurrentFileNameFull, TMPFileEnding), mCurrentFileNameFull);
283 }
284 // write file metaFile data
285 if (mStoreMetaFile) {
286 mFileMetaData->fillFileData(mCurrentFileNameFull);
287 mFileMetaData->setDataTakingContext(mDataTakingContext);
288 mFileMetaData->type = "raw";
289 auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, mCurrentFileName);
290 auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, mCurrentFileName);
291 try {
292 std::ofstream metaFileOut(metaFileNameTmp);
293 metaFileOut << *mFileMetaData.get();
294 metaFileOut << "TFOrbits: ";
295 for (size_t i = 0; i < mTFOrbits.size(); i++) {
296 metaFileOut << fmt::format("{}{}", i ? ", " : "", mTFOrbits[i]);
297 }
298 metaFileOut << '\n';
299 metaFileOut.close();
300 std::filesystem::rename(metaFileNameTmp, metaFileName);
301 } catch (std::exception const& e) {
302 LOG(error) << "Failed to store meta data file " << metaFileName << ", reason: " << e.what();
303 }
304 mFileMetaData.reset();
305 }
306 } catch (std::exception const& e) {
307 LOG(error) << "Failed to finalize file " << mCurrentFileNameFull << ", reason: " << e.what();
308 }
309 mTFOrbits.clear();
310 mInfoBranches.clear();
311 std::fill(mDataBranches.begin(), mDataBranches.end(), nullptr);
312 mNTFs = 0;
313 // mAccSize = 0;
314 // removeLockFile();
315}
316
317template <typename T>
318DataProcessorSpec getFileWriterSpec(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask)
319{
320 return DataProcessorSpec{
321 "file-writer",
322 select(inputSpec.data()),
323 Outputs{},
324 AlgorithmSpec{adaptFromTask<FileWriterDevice<T>>(branchType, sectorMask)},
325 Options{
326 {"output-dir", VariantType::String, "none", {" output directory, must exist"}},
327 {"meta-output-dir", VariantType::String, "/dev/null", {" metadata output directory, must exist (if not /dev/null)"}},
328 {"max-tf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested TFs per file"}},
329 {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}},
330 }};
331}; // end DataProcessorSpec
332
333// template spacializations
334template o2::framework::DataProcessorSpec getFileWriterSpec<o2::tpc::Digit>(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask);
335template o2::framework::DataProcessorSpec getFileWriterSpec<o2::tpc::KrCluster>(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask);
336} // namespace o2::tpc
Definition of the TPC Digit.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Writer for calibration data.
int32_t i
A helper class to iteratate over all parts of all input routes.
Definition of the Names Generator class.
Struct for Krypton and X-ray clusters.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
Definition NameConf.cxx:93
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
o2::header::DataHeader::SubSpecificationType SubSpecificationType
A helper class to iteratate over all parts of all input routes.
void init(InitContext &ic) final
void endOfStream(EndOfStreamContext &ec) final
void run(ProcessingContext &pc) final
void fillData(bool finalFill=false)
FileWriterDevice(const BranchType branchType, unsigned long sectorMask)
static constexpr int MAXSECTOR
Definition Sector.h:44
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:167
template o2::framework::DataProcessorSpec getFileWriterSpec< o2::tpc::KrCluster >(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask)
template o2::framework::DataProcessorSpec getFileWriterSpec< o2::tpc::Digit >(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask)
const std::unordered_map< BranchType, std::string > TreeName
const std::unordered_map< BranchType, std::string > BranchName
o2::framework::DataProcessorSpec getFileWriterSpec(const std::string inputSpec, const BranchType branchType=BranchType::Krypton, unsigned long sectorMask=0xFFFFFFFFF)
static constexpr const char * UNKNOWN
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"