21#include <TStopwatch.h>
57#include <fairmq/Device.h>
69 auto* br =
tree.GetBranch(brname.c_str());
70 if (br && br->GetEntries() > ev) {
92 void loadRunTimeSpans(
const std::string& flname);
93 void openCTFFile(
const std::string& flname);
95 void checkTreeEntries();
100 void tryToFixCTFHeader(
CTFHeader& ctfHeader)
const;
103 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
104 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
105 std::unique_ptr<TFile> mCTFFile;
106 std::unique_ptr<TTree> mCTFTree;
107 bool mRunning =
false;
108 bool mUseLocalTFCounter =
false;
109 bool mIFRamesOut =
false;
110 int mConvRunTimeRangesToOrbits = -1;
112 int mCTFCounterAcc = 0;
113 int mNFailedFiles = 0;
117 int mRunNumberPrev = -1;
118 long mTotalWaitTime = 0;
119 long mLastSendTime = 0L;
120 long mCurrTreeEntry = 0L;
121 long mImposeRunStartMS = 0L;
122 size_t mSelIDEntry = 0;
140void CTFReaderSpec::stopReader()
145 LOGP(info,
"CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
146 LOGP(info,
"CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
147 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
149 mFileFetcher->stop();
150 mFileFetcher.reset();
161 mInput.
ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-ctf-ids"));
163 std::reverse(mInput.
ctfIDs.begin(), mInput.
ctfIDs.end());
165 mUseLocalTFCounter = ic.
options().
get<
bool>(
"local-tf-counter");
166 mImposeRunStartMS = ic.
options().
get<int64_t>(
"impose-run-start-timstamp");
175 mFileFetcher->setMaxLoops(mInput.
maxLoops);
176 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
177 mFileFetcher->start();
181 mTFLength = hbfu.nHBFPerTF;
182 LOGP(info,
"IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.
fileIRFrames, mTFLength);
194 mIRFrameSelector.
clear();
195 auto ent = mRunTimeRanges.find(timingInfo.
runNumber);
196 if (ent == mRunTimeRanges.end()) {
197 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.
runNumber);
204 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", timingInfo.
runNumber);
207 std::vector<o2::dataformats::IRFrame> frames;
208 for (
const auto& rng : ent->second) {
209 long orbMin = 0, orbMax = 0;
210 if (mConvRunTimeRangesToOrbits > 0) {
227 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
233void CTFReaderSpec::loadRunTimeSpans(
const std::string& flname)
235 std::ifstream inputFile(flname);
237 LOGP(fatal,
"Failed to open selected run/timespans file {}", mInput.
fileRunTimeSpans);
240 size_t cntl = 0, cntr = 0;
241 while (std::getline(inputFile, line)) {
243 for (
char& ch :
line) {
244 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
249 if (
line.size() < 1 || line[0] ==
'#') {
253 auto logError = [&cntl, &
line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
254 if (tokens.size() >= 3) {
258 run = std::stoi(tokens[0]);
259 rmin = std::stol(tokens[1]);
260 rmax = std::stol(tokens[2]);
266 constexpr long ISTimeStamp = 1514761200000L;
267 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
269 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
271 if (mConvRunTimeRangesToOrbits == -1) {
272 if (convmn != convmx) {
273 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
275 mConvRunTimeRangesToOrbits = convmn;
276 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
278 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
279 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits",
line);
283 mRunTimeRanges[
run].emplace_back(rmin, rmax);
289 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.
fileRunTimeSpans);
294void CTFReaderSpec::openCTFFile(
const std::string& flname)
298 mCTFFile.reset(TFile::Open(flname.c_str()));
299 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
300 throw std::runtime_error(fmt::format(
"failed to open CTF file {}, skipping", flname));
304 throw std::runtime_error(fmt::format(
"failed to load CTF tree from {}, skipping", flname));
306 if (mCTFTree->GetEntries() < 1) {
307 throw std::runtime_error(fmt::format(
"CTF tree in {} has 0 entries, skipping", flname));
310 if (mInput.
ctfIDs.empty()) {
311 int entries = mCTFTree->GetEntries();
313 entries = std::min(entries, mInput.
maxTFs);
319 mInput.
ctfIDs.resize(entries);
320 std::iota(mInput.
ctfIDs.begin(), mInput.
ctfIDs.end(), 0);
322 std::random_device
dev;
325 LOGP(info,
"will shuffle reading of CTF entries in this order:");
327 LOGP(info,
"\tTF {:02} -> {:02}",
i, mInput.
ctfIDs[
i]);
330 }
catch (
const std::exception& e) {
331 LOG(error) <<
"Cannot process " << flname <<
", reason: " << e.what();
336 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
348 std::string tfFileName;
349 bool waitAcknowledged =
false;
355 LOG(
debug) <<
"TF " << mCTFCounter <<
" of " << mInput.
maxTFs <<
" loop " << mFileFetcher->getNLoops();
357 mCurrTreeEntry = mInput.
ctfIDs[mSelIDEntry];
365 LOGP(info,
"Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
371 tfFileName = mFileFetcher->getNextFileInQueue();
372 if (tfFileName.empty()) {
373 if (!mFileFetcher->isRunning()) {
377 if (!waitAcknowledged) {
378 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
379 waitAcknowledged =
true;
384 if (waitAcknowledged) {
385 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
386 mTotalWaitTime += waitTime;
388 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
390 waitAcknowledged =
false;
392 LOG(info) <<
"Reading CTF input " <<
' ' << tfFileName;
393 openCTFFile(tfFileName);
396 if (mCTFCounter >= mInput.
maxTFs || (!mInput.
ctfIDs.empty() && mSelIDEntry >= mInput.
ctfIDs.size())) {
397 LOGP(info,
"All CTFs from selected range were injected, stopping");
399 }
else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) {
407 const std::string dummy{
"ctf_read_ntf.txt"};
408 if (mCTFCounterAcc == 0) {
409 LOGP(warn,
"No TF passed selection, writing a 0 to file {}", dummy);
412 std::ofstream outfile;
413 outfile.open(dummy, std::ios::out | std::ios::trunc);
414 outfile << mCTFCounterAcc << std::endl;
416 LOGP(error,
"Failed to write {}", dummy);
424 auto cput = mTimer.CpuTime();
429 if (!
readFromTree(*(mCTFTree.get()),
"CTFHeader", ctfHeader, mCurrTreeEntry)) {
430 throw std::runtime_error(
"did not find CTFHeader");
432 if (mImposeRunStartMS > 0) {
436 tryToFixCTFHeader(ctfHeader);
439 if (mUseLocalTFCounter) {
443 LOG(info) << ctfHeader;
451 if (mRunTimeRanges.size() && timingInfo.
runNumber != mRunNumberPrev) {
452 runTimeRangesToIRFrameSelector(timingInfo);
455 gsl::span<const o2::dataformats::IRFrame> irSpan{};
456 if (mIRFrameSelector.
isSet()) {
463 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
464 irSpan.size(),
ir0.
asString(), ir1.asString(), acc ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
478 auto outVec = pc.
outputs().
make<std::vector<o2::dataformats::IRFrame>>(
OutputRef{
"selIRFrames"}, irSpan.begin(), irSpan.end());
483 processDetector<o2::itsmft::CTF>(
DetID::ITS, ctfHeader, pc);
484 processDetector<o2::itsmft::CTF>(
DetID::MFT, ctfHeader, pc);
485 processDetector<o2::emcal::CTF>(
DetID::EMC, ctfHeader, pc);
486 processDetector<o2::hmpid::CTF>(
DetID::HMP, ctfHeader, pc);
487 processDetector<o2::phos::CTF>(
DetID::PHS, ctfHeader, pc);
488 processDetector<o2::tpc::CTF>(
DetID::TPC, ctfHeader, pc);
489 processDetector<o2::trd::CTF>(
DetID::TRD, ctfHeader, pc);
490 processDetector<o2::ft0::CTF>(
DetID::FT0, ctfHeader, pc);
491 processDetector<o2::fv0::CTF>(
DetID::FV0, ctfHeader, pc);
492 processDetector<o2::fdd::CTF>(
DetID::FDD, ctfHeader, pc);
493 processDetector<o2::tof::CTF>(
DetID::TOF, ctfHeader, pc);
494 processDetector<o2::mid::CTF>(
DetID::MID, ctfHeader, pc);
495 processDetector<o2::mch::CTF>(
DetID::MCH, ctfHeader, pc);
496 processDetector<o2::cpv::CTF>(
DetID::CPV, ctfHeader, pc);
497 processDetector<o2::zdc::CTF>(
DetID::ZDC, ctfHeader, pc);
498 processDetector<o2::ctp::CTF>(
DetID::CTP, ctfHeader, pc);
504 stfDist.
id = uint64_t(mCurrTreeEntry);
506 stfDist.runNumber = uint32_t(ctfHeader.
run);
509 auto entryStr = fmt::format(
"({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
514 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
516 auto tDiff = tNow - mLastSendTime;
524 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
525 LOGP(info,
"Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
526 mLastSendTime = tNow;
532void CTFReaderSpec::checkTreeEntries()
534 bool reachedEnd{
false};
536 reachedEnd = (mCurrTreeEntry == mInput.
ctfIDs.back());
538 reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
545 mFileFetcher->popFromQueue(mInput.
maxLoops < 1);
551void CTFReaderSpec::setMessageHeader(
ProcessingContext& pc,
const CTFHeader& ctfHeader,
const std::string& lbl,
unsigned subspec)
const
555 throw std::runtime_error(fmt::format(
"failed to find output message header stack for {}", lbl));
559 dh->tfCounter = ctfHeader.tfCounter;
560 dh->runNumber = uint32_t(ctfHeader.run);
562 dph->
creation = ctfHeader.creationTime;
570 const auto lbl = det.
getName();
571 auto& bufVec = pc.
outputs().
make<std::vector<o2::ctf::BufferType>>({lbl, mInput.
subspec}, ctfHeader.detectors[det] ?
sizeof(
C) : 0);
572 if (ctfHeader.detectors[det]) {
573 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
575 throw std::runtime_error(fmt::format(
"Requested detector {} is missing in the CTF", lbl));
582void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader)
const
585 struct RunStartData {
588 uint64_t tstampMS0 = 0;
590 const std::vector<RunStartData> tf0Data{
591 {505207, 133875, 1635322620830},
592 {505217, 14225007, 1635328375618},
593 {505278, 1349340, 1635376882079},
594 {505285, 1488862, 1635378517248},
595 {505303, 2615411, 1635392586314},
596 {505397, 5093945, 1635454778123},
597 {505404, 19196217, 1635456032855},
598 {505405, 28537913, 1635456862913},
599 {505406, 41107641, 1635457980628},
600 {505413, 452530, 1635460562613},
601 {505440, 13320708, 1635472436927},
602 {505443, 26546564, 1635473613239},
603 {505446, 177711, 1635477270241},
604 {505548, 88037114, 1635544414050},
605 {505582, 295044346, 1635562822389},
606 {505600, 417241082, 1635573688564},
607 {505623, 10445984, 1635621310460},
608 {505629, 126979, 1635623289756},
609 {505637, 338969, 1635630909893},
610 {505645, 188222, 1635634560881},
611 {505658, 81044, 1635645404694},
612 {505669, 328291, 1635657807147},
613 {505673, 30988, 1635659148972},
614 {505713, 620506, 1635725054798},
615 {505720, 5359903, 1635730673978}};
616 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
617 for (
const auto& tf0 : tf0Data) {
618 if (ctfHeader.run == tf0.run) {
619 ctfHeader.creationTime = tf0.tstampMS0;
631 std::vector<InputSpec> inputs;
632 std::vector<OutputSpec> outputs;
633 std::vector<ConfigParamSpec> options;
635 outputs.emplace_back(
OutputLabel{
"header"},
"CTF",
"HEADER", inp.
subspec, Lifetime::Timeframe);
643 outputs.emplace_back(
OutputLabel{
"selIRFrames"},
"CTF",
"SELIRFRAMES", 0, Lifetime::Timeframe);
649 options.emplace_back(
ConfigParamSpec{
"select-ctf-ids", VariantType::String,
"", {
"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
650 options.emplace_back(
ConfigParamSpec{
"reverse-select-ctf-ids", VariantType::Bool,
false, {
"reverse order of to inject CTF IDs"}});
651 options.emplace_back(
ConfigParamSpec{
"impose-run-start-timstamp", VariantType::Int64, 0L, {
"impose run start time stamp (ms), ignored if 0"}});
652 options.emplace_back(
ConfigParamSpec{
"local-tf-counter", VariantType::Bool,
false, {
"reassign header.tfCounter from local TF counter"}});
653 options.emplace_back(
ConfigParamSpec{
"fetch-failure-threshold", VariantType::Float, 0.f, {
"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
654 options.emplace_back(
ConfigParamSpec{
"limit-tf-before-reading", VariantType::Bool,
false, {
"Check TF limiting before reading new TF, otherwhise before injecting it"}});
655 options.emplace_back(
ConfigParamSpec{
"max-tf", VariantType::Int, -1, {
"max CTFs to process (<= 0 : infinite)"}});
656 options.emplace_back(
ConfigParamSpec{
"max-tf-per-file", VariantType::Int, -1, {
"max TFs to process per ctf file (<= 0 : infinite)"}});
659 options.emplace_back(
ConfigParamSpec{
"channel-config", VariantType::String, inp.
metricChannel, {
"Out-of-band channel config for TF throttling"}});
default_random_engine gen(dev())
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
static constexpr std::string_view CTFTREENAME
static BasicCCDBManager & instance()
static const HBFUtils & Instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
~CTFReaderSpec() override
void init(o2::framework::InitContext &ic) final
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr const char * getName(ID id)
names of defined detectors
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
int32_t const char int32_t line
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
bool checkTFLimitBeforeReading
bool allowMissingDetectors
bool invertIRFramesSelection
uint32_t tfCounter
the orbit the TF begins
static AggregatedRunInfo buildAggregatedRunInfo(o2::ccdb::CCDBManagerInstance &ccdb, int runnumber)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)