21#include <TStopwatch.h>
58#include <fairmq/Device.h>
70 auto* br =
tree.GetBranch(brname.c_str());
71 if (br && br->GetEntries() > ev) {
93 void loadRunTimeSpans(
const std::string& flname);
94 void openCTFFile(
const std::string& flname);
96 void checkTreeEntries();
101 void tryToFixCTFHeader(
CTFHeader& ctfHeader)
const;
104 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
105 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
106 std::unique_ptr<TFile> mCTFFile;
107 std::unique_ptr<TTree> mCTFTree;
108 bool mRunning =
false;
109 bool mUseLocalTFCounter =
false;
110 bool mIFRamesOut =
false;
111 int mConvRunTimeRangesToOrbits = -1;
113 int mCTFCounterAcc = 0;
114 int mNFailedFiles = 0;
118 int mRunNumberPrev = -1;
119 long mTotalWaitTime = 0;
120 long mLastSendTime = 0L;
121 long mCurrTreeEntry = 0L;
122 long mImposeRunStartMS = 0L;
123 size_t mSelIDEntry = 0;
141void CTFReaderSpec::stopReader()
146 LOGP(info,
"CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
147 LOGP(info,
"CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
148 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
150 mFileFetcher->stop();
151 mFileFetcher.reset();
162 mInput.
ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-ctf-ids"));
164 std::reverse(mInput.
ctfIDs.begin(), mInput.
ctfIDs.end());
166 mUseLocalTFCounter = ic.
options().
get<
bool>(
"local-tf-counter");
167 mImposeRunStartMS = ic.
options().
get<int64_t>(
"impose-run-start-timstamp");
176 mFileFetcher->setMaxLoops(mInput.
maxLoops);
177 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
178 mFileFetcher->start();
182 mTFLength = hbfu.nHBFPerTF;
183 LOGP(info,
"IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.
fileIRFrames, mTFLength);
196 if (mInput.detMask[det]) {
197 std::string lbl = det.
getName();
204 LOGP(fatal,
"This specialization is define only for ITS and MFT detectors, {} provided", det.
getName());
206 for (
int iLayer = 0; iLayer < nLayers; iLayer++) {
209 auto brName = nLayers == 1 ? lbl : fmt::format(
"{}_{}", lbl, iLayer);
211 }
else if (!mInput.allowMissingDetectors) {
212 throw std::runtime_error(fmt::format(
"Requested detector {} is missing in the CTF", lbl));
223 const auto lbl = det.
getName();
224 auto& bufVec = pc.
outputs().
make<std::vector<o2::ctf::BufferType>>({lbl, mInput.
subspec}, ctfHeader.detectors[det] ?
sizeof(C) : 0);
225 if (ctfHeader.detectors[det]) {
226 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
228 throw std::runtime_error(fmt::format(
"Requested detector {} is missing in the CTF", lbl));
237 mIRFrameSelector.
clear();
238 auto ent = mRunTimeRanges.find(timingInfo.
runNumber);
239 if (ent == mRunTimeRanges.end()) {
240 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.
runNumber);
247 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", timingInfo.
runNumber);
250 std::vector<o2::dataformats::IRFrame> frames;
251 for (
const auto& rng : ent->second) {
252 long orbMin = 0, orbMax = 0;
253 if (mConvRunTimeRangesToOrbits > 0) {
270 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
276void CTFReaderSpec::loadRunTimeSpans(
const std::string& flname)
278 std::ifstream inputFile(flname);
280 LOGP(fatal,
"Failed to open selected run/timespans file {}", flname);
283 size_t cntl = 0, cntr = 0;
284 while (std::getline(inputFile, line)) {
286 for (
char& ch :
line) {
287 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
292 if (
line.size() < 1 || line[0] ==
'#') {
296 auto logError = [&cntl, &
line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
297 if (tokens.size() >= 3) {
301 run = std::stoi(tokens[0]);
302 rmin = std::stol(tokens[1]);
303 rmax = std::stol(tokens[2]);
309 constexpr long ISTimeStamp = 1514761200000L;
310 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
312 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
314 if (mConvRunTimeRangesToOrbits == -1) {
315 if (convmn != convmx) {
316 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
318 mConvRunTimeRangesToOrbits = convmn;
319 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
321 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
322 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits",
line);
326 mRunTimeRanges[
run].emplace_back(rmin, rmax);
332 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
337void CTFReaderSpec::openCTFFile(
const std::string& flname)
341 mCTFFile.reset(TFile::Open(flname.c_str()));
342 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
343 throw std::runtime_error(fmt::format(
"failed to open CTF file {}, skipping", flname));
347 throw std::runtime_error(fmt::format(
"failed to load CTF tree from {}, skipping", flname));
349 if (mCTFTree->GetEntries() < 1) {
350 throw std::runtime_error(fmt::format(
"CTF tree in {} has 0 entries, skipping", flname));
353 if (mInput.
ctfIDs.empty()) {
354 int entries = mCTFTree->GetEntries();
356 entries = std::min(entries, mInput.
maxTFs);
362 mInput.
ctfIDs.resize(entries);
363 std::iota(mInput.
ctfIDs.begin(), mInput.
ctfIDs.end(), 0);
365 std::random_device
dev;
368 LOGP(info,
"will shuffle reading of CTF entries in this order:");
370 LOGP(info,
"\tTF {:02} -> {:02}",
i, mInput.
ctfIDs[
i]);
373 }
catch (
const std::exception& e) {
374 LOG(error) <<
"Cannot process " << flname <<
", reason: " << e.what();
379 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
391 std::string tfFileName;
392 bool waitAcknowledged =
false;
398 LOG(
debug) <<
"TF " << mCTFCounter <<
" of " << mInput.
maxTFs <<
" loop " << mFileFetcher->getNLoops();
400 mCurrTreeEntry = mInput.
ctfIDs[mSelIDEntry];
408 LOGP(info,
"Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
414 tfFileName = mFileFetcher->getNextFileInQueue();
415 if (tfFileName.empty()) {
416 if (!mFileFetcher->isRunning()) {
420 if (!waitAcknowledged) {
421 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
422 waitAcknowledged =
true;
427 if (waitAcknowledged) {
428 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
429 mTotalWaitTime += waitTime;
431 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
433 waitAcknowledged =
false;
435 LOG(info) <<
"Reading CTF input " <<
' ' << tfFileName;
436 openCTFFile(tfFileName);
439 if (mCTFCounter >= mInput.
maxTFs || (!mInput.
ctfIDs.empty() && mSelIDEntry >= mInput.
ctfIDs.size())) {
440 LOGP(info,
"All CTFs from selected range were injected, stopping");
442 }
else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) {
450 const std::string dummy{
"ctf_read_ntf.txt"};
451 if (mCTFCounterAcc == 0) {
452 LOGP(warn,
"No TF passed selection, writing a 0 to file {}", dummy);
455 std::ofstream outfile;
456 outfile.open(dummy, std::ios::out | std::ios::trunc);
457 outfile << mCTFCounterAcc << std::endl;
459 LOGP(error,
"Failed to write {}", dummy);
467 auto cput = mTimer.CpuTime();
472 if (!
readFromTree(*(mCTFTree.get()),
"CTFHeader", ctfHeader, mCurrTreeEntry)) {
473 throw std::runtime_error(
"did not find CTFHeader");
475 if (mImposeRunStartMS > 0) {
479 tryToFixCTFHeader(ctfHeader);
482 if (mUseLocalTFCounter) {
486 LOG(info) << ctfHeader;
494 if (mRunTimeRanges.size() && timingInfo.
runNumber != mRunNumberPrev) {
495 runTimeRangesToIRFrameSelector(timingInfo);
498 gsl::span<const o2::dataformats::IRFrame> irSpan{};
499 if (mIRFrameSelector.
isSet()) {
506 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
507 irSpan.size(),
ir0.
asString(), ir1.asString(), acc ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
521 auto outVec = pc.
outputs().
make<std::vector<o2::dataformats::IRFrame>>(
OutputRef{
"selIRFrames"}, irSpan.begin(), irSpan.end());
528 processDetector<o2::emcal::CTF>(
DetID::EMC, ctfHeader, pc);
529 processDetector<o2::hmpid::CTF>(
DetID::HMP, ctfHeader, pc);
530 processDetector<o2::phos::CTF>(
DetID::PHS, ctfHeader, pc);
531 processDetector<o2::tpc::CTF>(
DetID::TPC, ctfHeader, pc);
532 processDetector<o2::trd::CTF>(
DetID::TRD, ctfHeader, pc);
533 processDetector<o2::ft0::CTF>(
DetID::FT0, ctfHeader, pc);
534 processDetector<o2::fv0::CTF>(
DetID::FV0, ctfHeader, pc);
535 processDetector<o2::fdd::CTF>(
DetID::FDD, ctfHeader, pc);
536 processDetector<o2::tof::CTF>(
DetID::TOF, ctfHeader, pc);
537 processDetector<o2::mid::CTF>(
DetID::MID, ctfHeader, pc);
538 processDetector<o2::mch::CTF>(
DetID::MCH, ctfHeader, pc);
539 processDetector<o2::cpv::CTF>(
DetID::CPV, ctfHeader, pc);
540 processDetector<o2::zdc::CTF>(
DetID::ZDC, ctfHeader, pc);
541 processDetector<o2::ctp::CTF>(
DetID::CTP, ctfHeader, pc);
547 stfDist.
id = uint64_t(mCurrTreeEntry);
549 stfDist.runNumber = uint32_t(ctfHeader.
run);
552 auto entryStr = fmt::format(
"({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
557 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
559 auto tDiff = tNow - mLastSendTime;
567 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
568 LOGP(info,
"Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
569 mLastSendTime = tNow;
575void CTFReaderSpec::checkTreeEntries()
577 bool reachedEnd{
false};
579 reachedEnd = (mCurrTreeEntry == mInput.
ctfIDs.back());
581 reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
588 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
594void CTFReaderSpec::setMessageHeader(
ProcessingContext& pc,
const CTFHeader& ctfHeader,
const std::string& lbl,
unsigned subspec)
const
598 throw std::runtime_error(fmt::format(
"failed to find output message header stack for {}", lbl));
602 dh->tfCounter = ctfHeader.tfCounter;
603 dh->runNumber = uint32_t(ctfHeader.run);
605 dph->
creation = ctfHeader.creationTime;
609void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader)
const
612 struct RunStartData {
615 uint64_t tstampMS0 = 0;
617 const std::vector<RunStartData> tf0Data{
618 {505207, 133875, 1635322620830},
619 {505217, 14225007, 1635328375618},
620 {505278, 1349340, 1635376882079},
621 {505285, 1488862, 1635378517248},
622 {505303, 2615411, 1635392586314},
623 {505397, 5093945, 1635454778123},
624 {505404, 19196217, 1635456032855},
625 {505405, 28537913, 1635456862913},
626 {505406, 41107641, 1635457980628},
627 {505413, 452530, 1635460562613},
628 {505440, 13320708, 1635472436927},
629 {505443, 26546564, 1635473613239},
630 {505446, 177711, 1635477270241},
631 {505548, 88037114, 1635544414050},
632 {505582, 295044346, 1635562822389},
633 {505600, 417241082, 1635573688564},
634 {505623, 10445984, 1635621310460},
635 {505629, 126979, 1635623289756},
636 {505637, 338969, 1635630909893},
637 {505645, 188222, 1635634560881},
638 {505658, 81044, 1635645404694},
639 {505669, 328291, 1635657807147},
640 {505673, 30988, 1635659148972},
641 {505713, 620506, 1635725054798},
642 {505720, 5359903, 1635730673978}};
643 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
644 for (
const auto& tf0 : tf0Data) {
645 if (ctfHeader.run == tf0.run) {
646 ctfHeader.creationTime = tf0.tstampMS0;
658 std::vector<InputSpec> inputs;
659 std::vector<OutputSpec> outputs;
660 std::vector<ConfigParamSpec> options;
662 outputs.emplace_back(
OutputLabel{
"header"},
"CTF",
"HEADER", inp.
subspec, Lifetime::Timeframe);
668 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
673 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
682 outputs.emplace_back(
OutputLabel{
"selIRFrames"},
"CTF",
"SELIRFRAMES", 0, Lifetime::Timeframe);
687 options.emplace_back(
ConfigParamSpec{
"select-ctf-ids", VariantType::String,
"", {
"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
688 options.emplace_back(
ConfigParamSpec{
"reverse-select-ctf-ids", VariantType::Bool,
false, {
"reverse order of to inject CTF IDs"}});
689 options.emplace_back(
ConfigParamSpec{
"impose-run-start-timstamp", VariantType::Int64, 0L, {
"impose run start time stamp (ms), ignored if 0"}});
690 options.emplace_back(
ConfigParamSpec{
"local-tf-counter", VariantType::Bool,
false, {
"reassign header.tfCounter from local TF counter"}});
691 options.emplace_back(
ConfigParamSpec{
"fetch-failure-threshold", VariantType::Float, 0.f, {
"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
692 options.emplace_back(
ConfigParamSpec{
"limit-tf-before-reading", VariantType::Bool,
false, {
"Check TF limiting before reading new TF, otherwhise before injecting it"}});
693 options.emplace_back(
ConfigParamSpec{
"max-tf", VariantType::Int, -1, {
"max CTFs to process (<= 0 : infinite)"}});
694 options.emplace_back(
ConfigParamSpec{
"max-tf-per-file", VariantType::Int, -1, {
"max TFs to process per ctf file (<= 0 : infinite)"}});
697 options.emplace_back(
ConfigParamSpec{
"channel-config", VariantType::String, inp.
metricChannel, {
"Out-of-band channel config for TF throttling"}});
default_random_engine gen(dev())
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static constexpr std::string_view CTFTREENAME
static BasicCCDBManager & instance()
static const HBFUtils & Instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
~CTFReaderSpec() override
void init(o2::framework::InitContext &ic) final
void readFromTree(TTree &tree, const std::string &name, int ev=0)
read from tree to non-flat object
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr const char * getName(ID id)
names of defined detectors
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
void CTFReaderSpec::processDetector< o2::itsmft::CTF >(DetID det, const CTFHeader &ctfHeader, ProcessingContext &pc) const
Defining ITS Vertex explicitly as messageable.
int32_t const char int32_t line
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
bool checkTFLimitBeforeReading
bool allowMissingDetectors
bool invertIRFramesSelection
uint32_t tfCounter
the orbit the TF begins
wrapper for the Entropy-encoded clusters of the TF
static constexpr int getNLayers()
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec, const o2::parameters::GRPLHCIFData *grplhcif=nullptr)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)