52#include <TStopwatch.h>
53#include <fairmq/Device.h>
65 auto* br =
tree.GetBranch(brname.c_str());
66 if (br && br->GetEntries() > ev) {
88 void loadRunTimeSpans(
const std::string& flname);
89 void openCTFFile(
const std::string& flname);
91 void checkTreeEntries();
96 void tryToFixCTFHeader(
CTFHeader& ctfHeader)
const;
99 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
100 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
101 std::unique_ptr<TFile> mCTFFile;
102 std::unique_ptr<TTree> mCTFTree;
103 bool mRunning =
false;
104 bool mUseLocalTFCounter =
false;
105 bool mIFRamesOut =
false;
106 int mConvRunTimeRangesToOrbits = -1;
108 int mCTFCounterAcc = 0;
109 int mNFailedFiles = 0;
113 int mRunNumberPrev = -1;
114 long mTotalWaitTime = 0;
115 long mLastSendTime = 0L;
116 long mCurrTreeEntry = 0L;
117 long mImposeRunStartMS = 0L;
118 size_t mSelIDEntry = 0;
136void CTFReaderSpec::stopReader()
141 LOGP(info,
"CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
142 LOGP(info,
"CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
143 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
145 mFileFetcher->stop();
146 mFileFetcher.reset();
157 mInput.
ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-ctf-ids"));
158 mUseLocalTFCounter = ic.
options().
get<
bool>(
"local-tf-counter");
159 mImposeRunStartMS = ic.
options().
get<int64_t>(
"impose-run-start-timstamp");
168 mFileFetcher->setMaxLoops(mInput.
maxLoops);
169 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
170 mFileFetcher->start();
174 mTFLength = hbfu.nHBFPerTF;
175 LOGP(info,
"IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.
fileIRFrames, mTFLength);
187 mIRFrameSelector.
clear();
188 auto ent = mRunTimeRanges.find(timingInfo.
runNumber);
189 if (ent == mRunTimeRanges.end()) {
190 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.
runNumber);
197 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", timingInfo.
runNumber);
200 std::vector<o2::dataformats::IRFrame> frames;
201 for (
const auto& rng : ent->second) {
202 long orbMin = 0, orbMax = 0;
203 if (mConvRunTimeRangesToOrbits > 0) {
220 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
226void CTFReaderSpec::loadRunTimeSpans(
const std::string& flname)
228 std::ifstream inputFile(flname);
230 LOGP(fatal,
"Failed to open selected run/timespans file {}", mInput.
fileRunTimeSpans);
233 size_t cntl = 0, cntr = 0;
234 while (std::getline(inputFile, line)) {
236 for (
char& ch : line) {
237 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
242 if (line.size() < 1 || line[0] ==
'#') {
246 auto logError = [&cntl, &line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
247 if (tokens.size() >= 3) {
251 run = std::stoi(tokens[0]);
252 rmin = std::stol(tokens[1]);
253 rmax = std::stol(tokens[2]);
259 constexpr long ISTimeStamp = 1514761200000L;
260 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
262 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
264 if (mConvRunTimeRangesToOrbits == -1) {
265 if (convmn != convmx) {
266 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
268 mConvRunTimeRangesToOrbits = convmn;
269 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
271 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
272 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits", line);
276 mRunTimeRanges[
run].emplace_back(rmin, rmax);
282 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.
fileRunTimeSpans);
287void CTFReaderSpec::openCTFFile(
const std::string& flname)
291 mCTFFile.reset(TFile::Open(flname.c_str()));
292 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
293 throw std::runtime_error(fmt::format(
"failed to open CTF file {}, skipping", flname));
297 throw std::runtime_error(fmt::format(
"failed to load CTF tree from {}, skipping", flname));
299 if (mCTFTree->GetEntries() < 1) {
300 throw std::runtime_error(fmt::format(
"CTF tree in {} has 0 entries, skipping", flname));
302 }
catch (
const std::exception& e) {
303 LOG(error) <<
"Cannot process " << flname <<
", reason: " << e.what();
308 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
320 std::string tfFileName;
321 bool waitAcknowledged =
false;
326 if (mInput.
ctfIDs.empty() || mInput.
ctfIDs[mSelIDEntry] == mCTFCounter) {
327 LOG(
debug) <<
"TF " << mCTFCounter <<
" of " << mInput.
maxTFs <<
" loop " << mFileFetcher->getNLoops();
334 LOGP(info,
"Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
340 tfFileName = mFileFetcher->getNextFileInQueue();
341 if (tfFileName.empty()) {
342 if (!mFileFetcher->isRunning()) {
346 if (!waitAcknowledged) {
347 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
348 waitAcknowledged =
true;
353 if (waitAcknowledged) {
354 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
355 mTotalWaitTime += waitTime;
357 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
359 waitAcknowledged =
false;
361 LOG(info) <<
"Reading CTF input " <<
' ' << tfFileName;
362 openCTFFile(tfFileName);
365 if (mCTFCounter >= mInput.
maxTFs || (!mInput.
ctfIDs.empty() && mSelIDEntry >= mInput.
ctfIDs.size())) {
366 LOGP(info,
"All CTFs from selected range were injected, stopping");
368 }
else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) {
376 const std::string dummy{
"ctf_read_ntf.txt"};
377 if (mCTFCounterAcc == 0) {
378 LOGP(warn,
"No TF passed selection, writing a 0 to file {}", dummy);
381 std::ofstream outfile;
382 outfile.open(dummy, std::ios::out | std::ios::trunc);
383 outfile << mCTFCounterAcc << std::endl;
385 LOGP(error,
"Failed to write {}", dummy);
393 auto cput = mTimer.CpuTime();
398 if (!
readFromTree(*(mCTFTree.get()),
"CTFHeader", ctfHeader, mCurrTreeEntry)) {
399 throw std::runtime_error(
"did not find CTFHeader");
401 if (mImposeRunStartMS > 0) {
405 tryToFixCTFHeader(ctfHeader);
408 if (mUseLocalTFCounter) {
412 LOG(info) << ctfHeader;
420 if (mRunTimeRanges.size() && timingInfo.
runNumber != mRunNumberPrev) {
421 runTimeRangesToIRFrameSelector(timingInfo);
424 gsl::span<const o2::dataformats::IRFrame> irSpan{};
425 if (mIRFrameSelector.
isSet()) {
432 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
433 irSpan.size(),
ir0.
asString(), ir1.asString(), acc ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
447 auto outVec = pc.
outputs().
make<std::vector<o2::dataformats::IRFrame>>(
OutputRef{
"selIRFrames"}, irSpan.begin(), irSpan.end());
452 processDetector<o2::itsmft::CTF>(
DetID::ITS, ctfHeader, pc);
453 processDetector<o2::itsmft::CTF>(
DetID::MFT, ctfHeader, pc);
454 processDetector<o2::emcal::CTF>(
DetID::EMC, ctfHeader, pc);
455 processDetector<o2::hmpid::CTF>(
DetID::HMP, ctfHeader, pc);
456 processDetector<o2::phos::CTF>(
DetID::PHS, ctfHeader, pc);
457 processDetector<o2::tpc::CTF>(
DetID::TPC, ctfHeader, pc);
458 processDetector<o2::trd::CTF>(
DetID::TRD, ctfHeader, pc);
459 processDetector<o2::ft0::CTF>(
DetID::FT0, ctfHeader, pc);
460 processDetector<o2::fv0::CTF>(
DetID::FV0, ctfHeader, pc);
461 processDetector<o2::fdd::CTF>(
DetID::FDD, ctfHeader, pc);
462 processDetector<o2::tof::CTF>(
DetID::TOF, ctfHeader, pc);
463 processDetector<o2::mid::CTF>(
DetID::MID, ctfHeader, pc);
464 processDetector<o2::mch::CTF>(
DetID::MCH, ctfHeader, pc);
465 processDetector<o2::cpv::CTF>(
DetID::CPV, ctfHeader, pc);
466 processDetector<o2::zdc::CTF>(
DetID::ZDC, ctfHeader, pc);
467 processDetector<o2::ctp::CTF>(
DetID::CTP, ctfHeader, pc);
473 stfDist.
id = uint64_t(mCurrTreeEntry);
475 stfDist.runNumber = uint32_t(ctfHeader.
run);
478 auto entryStr = fmt::format(
"({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
483 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
485 auto tDiff = tNow - mLastSendTime;
493 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
494 LOGP(info,
"Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
495 mLastSendTime = tNow;
501void CTFReaderSpec::checkTreeEntries()
509 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
515void CTFReaderSpec::setMessageHeader(
ProcessingContext& pc,
const CTFHeader& ctfHeader,
const std::string& lbl,
unsigned subspec)
const
519 throw std::runtime_error(fmt::format(
"failed to find output message header stack for {}", lbl));
523 dh->tfCounter = ctfHeader.tfCounter;
524 dh->runNumber = uint32_t(ctfHeader.run);
526 dph->
creation = ctfHeader.creationTime;
534 const auto lbl = det.
getName();
535 auto& bufVec = pc.
outputs().
make<std::vector<o2::ctf::BufferType>>({lbl, mInput.
subspec}, ctfHeader.detectors[det] ?
sizeof(
C) : 0);
536 if (ctfHeader.detectors[det]) {
537 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
539 throw std::runtime_error(fmt::format(
"Requested detector {} is missing in the CTF", lbl));
546void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader)
const
549 struct RunStartData {
552 uint64_t tstampMS0 = 0;
554 const std::vector<RunStartData> tf0Data{
555 {505207, 133875, 1635322620830},
556 {505217, 14225007, 1635328375618},
557 {505278, 1349340, 1635376882079},
558 {505285, 1488862, 1635378517248},
559 {505303, 2615411, 1635392586314},
560 {505397, 5093945, 1635454778123},
561 {505404, 19196217, 1635456032855},
562 {505405, 28537913, 1635456862913},
563 {505406, 41107641, 1635457980628},
564 {505413, 452530, 1635460562613},
565 {505440, 13320708, 1635472436927},
566 {505443, 26546564, 1635473613239},
567 {505446, 177711, 1635477270241},
568 {505548, 88037114, 1635544414050},
569 {505582, 295044346, 1635562822389},
570 {505600, 417241082, 1635573688564},
571 {505623, 10445984, 1635621310460},
572 {505629, 126979, 1635623289756},
573 {505637, 338969, 1635630909893},
574 {505645, 188222, 1635634560881},
575 {505658, 81044, 1635645404694},
576 {505669, 328291, 1635657807147},
577 {505673, 30988, 1635659148972},
578 {505713, 620506, 1635725054798},
579 {505720, 5359903, 1635730673978}};
580 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
581 for (
const auto& tf0 : tf0Data) {
582 if (ctfHeader.run == tf0.run) {
583 ctfHeader.creationTime = tf0.tstampMS0;
595 std::vector<InputSpec> inputs;
596 std::vector<OutputSpec> outputs;
597 std::vector<ConfigParamSpec> options;
599 outputs.emplace_back(
OutputLabel{
"header"},
"CTF",
"HEADER", inp.
subspec, Lifetime::Timeframe);
607 outputs.emplace_back(
OutputLabel{
"selIRFrames"},
"CTF",
"SELIRFRAMES", 0, Lifetime::Timeframe);
613 options.emplace_back(
ConfigParamSpec{
"select-ctf-ids", VariantType::String,
"", {
"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
614 options.emplace_back(
ConfigParamSpec{
"impose-run-start-timstamp", VariantType::Int64, 0L, {
"impose run start time stamp (ms), ignored if 0"}});
615 options.emplace_back(
ConfigParamSpec{
"local-tf-counter", VariantType::Bool,
false, {
"reassign header.tfCounter from local TF counter"}});
616 options.emplace_back(
ConfigParamSpec{
"fetch-failure-threshold", VariantType::Float, 0.f, {
"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
617 options.emplace_back(
ConfigParamSpec{
"limit-tf-before-reading", VariantType::Bool,
false, {
"Check TF limiting before reading new TF, otherwhise before injecting it"}});
618 options.emplace_back(
ConfigParamSpec{
"max-tf", VariantType::Int, -1, {
"max CTFs to process (<= 0 : infinite)"}});
619 options.emplace_back(
ConfigParamSpec{
"max-tf-per-file", VariantType::Int, -1, {
"max TFs to process per ctf file (<= 0 : infinite)"}});
622 options.emplace_back(
ConfigParamSpec{
"channel-config", VariantType::String, inp.
metricChannel, {
"Out-of-band channel config for TF throttling"}});
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static constexpr std::string_view CTFTREENAME
static BasicCCDBManager & instance()
static const HBFUtils & Instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
~CTFReaderSpec() override
void init(o2::framework::InitContext &ic) final
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr const char * getName(ID id)
names of defined detectors
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
bool checkTFLimitBeforeReading
bool allowMissingDetectors
bool invertIRFramesSelection
uint32_t tfCounter
the orbit the TF begins
static AggregatedRunInfo buildAggregatedRunInfo(o2::ccdb::CCDBManagerInstance &ccdb, int runnumber)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)