21#include "fmt/format.h"
25#include <fairmq/Device.h>
62 mBranchType = branchType;
63 mSectorMask = sectorMask;
70 mCreateRunEnvDir = !ic.options().get<
bool>(
"ignore-partition-run-dir");
71 mMaxTFPerFile = ic.options().get<
int>(
"max-tf-per-file");
72 mMetaFileDir = ic.options().get<std::string>(
"meta-output-dir");
73 if (mMetaFileDir !=
"/dev/null") {
75 mStoreMetaFile =
true;
81 for (
auto it = mCollectedData.begin(); it != mCollectedData.end();) {
82 auto& dataStore = it->second;
83 if (!finalFill && (dataStore.received.to_ulong() != mSectorMask)) {
87 const auto oldTF = mPresentTF;
88 mPresentTF = it->first;
89 const auto oldTForbit = mFirstTForbit;
90 mFirstTForbit = dataStore.firstOrbit;
91 mTFOrbits.emplace_back(mFirstTForbit);
93 LOGP(info,
"Filling tree entry {} for run {}, tf {}, orbit {}, final {}, sectors {}, (expected: {})", mNTFs, mRun, mPresentTF, mFirstTForbit, finalFill, dataStore.received.to_string(), std::bitset<Sector::MAXSECTOR>(mSectorMask).to_string());
94 auto&
data = dataStore.data;
96 for (
size_t sector = 0; sector <
data.size(); ++sector) {
97 auto& inData =
data[sector];
98 dataPtr[sector] = &inData;
100 if (!mDataBranches[sector]) {
101 mDataBranches[sector] = mTreeOut->Branch(fmt::format(
"{}_{}",
BranchName.at(mBranchType), sector).data(), &inData);
103 mDataBranches[sector]->SetAddress(&dataPtr[sector]);
108 for (
size_t sector = 0; sector <
data.size(); ++sector) {
109 mDataBranches[sector]->ResetAddress();
113 mFirstTForbit = oldTForbit;
116 it = mCollectedData.erase(it);
122 static bool initOnceDone =
false;
128 const std::string NAStr =
"NA";
131 auto oldTF = mPresentTF;
133 mPresentTF = tinfo.tfCounter;
134 mFirstTForbit = tinfo.firstTForbit;
137 if (!mCollectedData.size() || mCollectedData.find(mPresentTF) == mCollectedData.end()) {
138 prepareTreeAndFile(tinfo);
145 auto& dataStore = mCollectedData[mPresentTF];
146 auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(inputRef);
147 if (sectorHeader ==
nullptr) {
148 LOGP(error,
"sector header missing on header stack for input on ", inputRef.spec->binding);
152 const int sector = sectorHeader->sector();
154 if (dataStore.received.test(sector)) {
155 LOGP(fatal,
"data for sector {} and TF {} already received", sector, mPresentTF);
157 auto data = pc.inputs().get<std::vector<T>>(inputRef);
159 dataStore.data[sector] =
data;
160 dataStore.received.set(sector);
161 dataStore.firstOrbit = mFirstTForbit;
179 uint32_t firstOrbit{};
181 std::bitset<Sector::MAXSECTOR> received;
183 std::map<uint32_t, DataStore> mCollectedData;
184 std::array<TBranch*, Sector::MAXSECTOR> mDataBranches{};
185 std::vector<TBranch*> mInfoBranches;
186 std::vector<uint32_t> mTFOrbits{};
187 std::unique_ptr<TFile> mFileOut;
188 std::unique_ptr<TTree> mTreeOut;
189 std::unique_ptr<FileMetaData> mFileMetaData;
190 std::string mOutDir{};
191 std::string mMetaFileDir{
"/dev/null"};
192 std::string mCurrentFileName{};
193 std::string mCurrentFileNameFull{};
195 uint32_t mPresentTF = 0;
196 uint32_t mFirstTForbit = 0;
197 unsigned long mSectorMask = 0xFFFFFFFFF;
200 int mMaxTFPerFile = 0;
201 bool mStoreMetaFile =
false;
203 bool mCreateRunEnvDir =
true;
206 static constexpr std::string_view TMPFileEnding{
".part"};
209 void closeTreeAndFile();
218 bool needToOpen =
false;
222 if ((mMaxTFPerFile > 0) && (mNTFs >= mMaxTFPerFile)) {
227 LOGP(info,
"opening new file");
241 ctfDir += fmt::format(
"{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
242 if (!std::filesystem::exists(ctfDir)) {
243 if (!std::filesystem::create_directories(ctfDir)) {
244 throw std::runtime_error(fmt::format(
"Failed to create {} directory", ctfDir));
246 LOG(info) <<
"Created {} directory for s output" << ctfDir;
251 mCurrentFileNameFull = fmt::format(
"{}{}", ctfDir, mCurrentFileName);
252 mFileOut.reset(TFile::Open(fmt::format(
"{}{}", mCurrentFileNameFull, TMPFileEnding).c_str(),
"recreate"));
253 mTreeOut = std::make_unique<TTree>(
TreeName.at(mBranchType).data(),
"O2 tree");
254 mInfoBranches.emplace_back(mTreeOut->Branch(
"run", &mRun));
255 mInfoBranches.emplace_back(mTreeOut->Branch(
"tfCounter", &mPresentTF));
256 mInfoBranches.emplace_back(mTreeOut->Branch(
"firstOrbit", &mFirstTForbit));
257 LOGP(info,
"created {} info branches", mInfoBranches.size());
258 if (mStoreMetaFile) {
259 mFileMetaData = std::make_unique<o2::dataformats::FileMetaData>();
267void FileWriterDevice<T>::closeTreeAndFile()
273 LOGP(info,
"closing file {}", mCurrentFileName);
276 mTreeOut->SetEntries();
281 if (!TMPFileEnding.empty()) {
285 if (mStoreMetaFile) {
286 mFileMetaData->fillFileData(mCurrentFileNameFull);
287 mFileMetaData->setDataTakingContext(mDataTakingContext);
288 mFileMetaData->type =
"raw";
289 auto metaFileNameTmp = fmt::format(
"{}{}.tmp", mMetaFileDir, mCurrentFileName);
290 auto metaFileName = fmt::format(
"{}{}.done", mMetaFileDir, mCurrentFileName);
292 std::ofstream metaFileOut(metaFileNameTmp);
293 metaFileOut << *mFileMetaData.get();
294 metaFileOut <<
"TFOrbits: ";
295 for (
size_t i = 0;
i < mTFOrbits.size();
i++) {
296 metaFileOut << fmt::format(
"{}{}",
i ?
", " :
"", mTFOrbits[
i]);
300 std::filesystem::rename(metaFileNameTmp, metaFileName);
301 }
catch (std::exception
const& e) {
302 LOG(error) <<
"Failed to store meta data file " << metaFileName <<
", reason: " << e.what();
304 mFileMetaData.reset();
306 }
catch (std::exception
const& e) {
307 LOG(error) <<
"Failed to finalize file " << mCurrentFileNameFull <<
", reason: " << e.what();
310 mInfoBranches.clear();
311 std::fill(mDataBranches.begin(), mDataBranches.end(),
nullptr);
324 AlgorithmSpec{adaptFromTask<FileWriterDevice<T>>(branchType, sectorMask)},
326 {
"output-dir", VariantType::String,
"none", {
" output directory, must exist"}},
327 {
"meta-output-dir", VariantType::String,
"/dev/null", {
" metadata output directory, must exist (if not /dev/null)"}},
328 {
"max-tf-per-file", VariantType::Int, 0, {
"if > 0, avoid storing more than requested TFs per file"}},
329 {
"ignore-partition-run-dir", VariantType::Bool,
false, {
"Do not creare partition-run directory in output-dir"}},
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Writer for calibration data.
Definition of the Names Generator class.
static std::string getCTFFileName(uint32_t run, uint32_t orb, uint32_t id, const std::string &host, const std::string_view prefix="o2_ctf")
Static class with identifiers, bitmasks and names for ALICE detectors.
o2::header::DataHeader::SubSpecificationType SubSpecificationType
void init(InitContext &ic) final
void endOfStream(EndOfStreamContext &ec) final
void run(ProcessingContext &pc) final
void fillData(bool finalFill=false)
FileWriterDevice(const BranchType branchType, unsigned long sectorMask)
static constexpr int MAXSECTOR
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
template o2::framework::DataProcessorSpec getFileWriterSpec< o2::tpc::KrCluster >(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask)
template o2::framework::DataProcessorSpec getFileWriterSpec< o2::tpc::Digit >(const std::string inputSpec, const BranchType branchType, unsigned long sectorMask)
const std::unordered_map< BranchType, std::string > TreeName
const std::unordered_map< BranchType, std::string > BranchName
o2::framework::DataProcessorSpec getFileWriterSpec(const std::string inputSpec, const BranchType branchType=BranchType::Krypton, unsigned long sectorMask=0xFFFFFFFFF)
static constexpr const char * UNKNOWN
uint32_t tfCounter
the orbit the TF begins
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"