Project
Loading...
Searching...
No Matches
CTFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <random>
15#include <vector>
16#include <algorithm>
17#include <numeric>
18
19#include <TFile.h>
20#include <TTree.h>
21#include <TStopwatch.h>
22
23#include "Framework/Logger.h"
26#include "Framework/InputSpec.h"
37#include "Headers/STFHeader.h"
40#include "DataFormatsTPC/CTF.h"
41#include "DataFormatsTRD/CTF.h"
42#include "DataFormatsFT0/CTF.h"
43#include "DataFormatsFV0/CTF.h"
44#include "DataFormatsFDD/CTF.h"
45#include "DataFormatsTOF/CTF.h"
46#include "DataFormatsMID/CTF.h"
47#include "DataFormatsMCH/CTF.h"
49#include "DataFormatsPHOS/CTF.h"
50#include "DataFormatsCPV/CTF.h"
51#include "DataFormatsZDC/CTF.h"
52#include "DataFormatsHMP/CTF.h"
53#include "DataFormatsCTP/CTF.h"
58#include <fairmq/Device.h>
59
60using namespace o2::framework;
61
62namespace o2
63{
64namespace ctf
65{
66
67template <typename T>
68bool readFromTree(TTree& tree, const std::string brname, T& dest, int ev = 0)
69{
70 auto* br = tree.GetBranch(brname.c_str());
71 if (br && br->GetEntries() > ev) {
72 auto* ptr = &dest;
73 br->SetAddress(&ptr);
74 br->GetEntry(ev);
75 br->ResetAddress();
76 return true;
77 }
78 return false;
79}
80
82
84{
85 public:
86 CTFReaderSpec(const CTFReaderInp& inp);
87 ~CTFReaderSpec() override;
88 void init(o2::framework::InitContext& ic) final;
90
91 private:
92 void runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo);
93 void loadRunTimeSpans(const std::string& flname);
94 void openCTFFile(const std::string& flname);
95 bool processTF(ProcessingContext& pc);
96 void checkTreeEntries();
97 void stopReader();
98 template <typename C>
99 void processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const;
100 void setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const; // keep just for the reference
101 void tryToFixCTFHeader(CTFHeader& ctfHeader) const;
102 CTFReaderInp mInput{};
103 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
104 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
105 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
106 std::unique_ptr<TFile> mCTFFile;
107 std::unique_ptr<TTree> mCTFTree;
108 bool mRunning = false;
109 bool mUseLocalTFCounter = false;
110 bool mIFRamesOut = false;
111 int mConvRunTimeRangesToOrbits = -1; // not defined yet
112 int mCTFCounter = 0;
113 int mCTFCounterAcc = 0;
114 int mNFailedFiles = 0;
115 int mFilesRead = 0;
116 int mTFLength = 32;
117 int mNWaits = 0;
118 int mRunNumberPrev = -1;
119 long mTotalWaitTime = 0;
120 long mLastSendTime = 0L;
121 long mCurrTreeEntry = 0L;
122 long mImposeRunStartMS = 0L;
123 size_t mSelIDEntry = 0; // next CTFID to select from the mInput.ctfIDs (if non-empty)
124 TStopwatch mTimer;
125};
126
129{
130 mTimer.Stop();
131 mTimer.Reset();
132}
133
136{
137 stopReader();
138}
139
141void CTFReaderSpec::stopReader()
142{
143 if (!mFileFetcher) {
144 return;
145 }
146 LOGP(info, "CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
147 LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
148 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
149 mRunning = false;
150 mFileFetcher->stop();
151 mFileFetcher.reset();
152 mCTFTree.reset();
153 if (mCTFFile) {
154 mCTFFile->Close();
155 }
156 mCTFFile.reset();
157}
158
161{
162 mInput.ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-ctf-ids"));
163 if ((mInput.reverseCTFIDs = ic.options().get<bool>("reverse-select-ctf-ids"))) {
164 std::reverse(mInput.ctfIDs.begin(), mInput.ctfIDs.end());
165 }
166 mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
167 mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
168 mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
169 mInput.maxTFs = ic.options().get<int>("max-tf");
170 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
171 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
172 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
173 mRunning = true;
174 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd, mInput.copyDir);
175 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
176 mFileFetcher->setMaxLoops(mInput.maxLoops);
177 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
178 mFileFetcher->start();
179 if (!mInput.fileIRFrames.empty()) {
180 mIRFrameSelector.loadIRFrames(mInput.fileIRFrames);
181 const auto& hbfu = o2::raw::HBFUtils::Instance();
182 mTFLength = hbfu.nHBFPerTF;
183 LOGP(info, "IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.fileIRFrames, mTFLength);
184 mIFRamesOut = true;
185 }
186 if (!mInput.fileRunTimeSpans.empty()) {
187 loadRunTimeSpans(mInput.fileRunTimeSpans);
188 mIFRamesOut = true;
189 }
190}
191
193template <>
194void CTFReaderSpec::processDetector<o2::itsmft::CTF>(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const
195{
196 if (mInput.detMask[det]) {
197 std::string lbl = det.getName();
198 int nLayers = 1;
199 if (det == DetID::ITS) {
200 nLayers = mInput.doITSStaggering ? o2::itsmft::DPLAlpideParam<DetID::ITS>::getNLayers() : 1;
201 } else if (det == DetID::MFT) {
202 nLayers = mInput.doMFTStaggering ? o2::itsmft::DPLAlpideParam<DetID::MFT>::getNLayers() : 1;
203 } else {
204 LOGP(fatal, "This specialization is define only for ITS and MFT detectors, {} provided", det.getName());
205 }
206 for (int iLayer = 0; iLayer < nLayers; iLayer++) {
207 auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({lbl, mInput.subspec * 100 + iLayer}, ctfHeader.detectors[det] ? sizeof(o2::itsmft::CTF) : 0);
208 if (ctfHeader.detectors[det]) {
209 auto brName = nLayers == 1 ? lbl : fmt::format("{}_{}", lbl, iLayer);
210 o2::itsmft::CTF::readFromTree(bufVec, *(mCTFTree.get()), brName, mCurrTreeEntry);
211 } else if (!mInput.allowMissingDetectors) {
212 throw std::runtime_error(fmt::format("Requested detector {} is missing in the CTF", lbl));
213 }
214 }
215 }
216}
217
219template <typename C>
220void CTFReaderSpec::processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const
221{
222 if (mInput.detMask[det]) {
223 const auto lbl = det.getName();
224 auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({lbl, mInput.subspec}, ctfHeader.detectors[det] ? sizeof(C) : 0);
225 if (ctfHeader.detectors[det]) {
226 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
227 } else if (!mInput.allowMissingDetectors) {
228 throw std::runtime_error(fmt::format("Requested detector {} is missing in the CTF", lbl));
229 }
230 // setMessageHeader(pc, ctfHeader, lbl);
231 }
232}
233
234void CTFReaderSpec::runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo)
235{
236 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
237 mIRFrameSelector.clear();
238 auto ent = mRunTimeRanges.find(timingInfo.runNumber);
239 if (ent == mRunTimeRanges.end()) {
240 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.runNumber);
241 return;
242 }
246 if (rinfo.runNumber != timingInfo.runNumber || rinfo.orbitsPerTF < 1) {
247 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", timingInfo.runNumber);
248 }
249 mTFLength = rinfo.orbitsPerTF;
250 std::vector<o2::dataformats::IRFrame> frames;
251 for (const auto& rng : ent->second) {
252 long orbMin = 0, orbMax = 0;
253 if (mConvRunTimeRangesToOrbits > 0) {
254 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
255 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
256 } else {
257 orbMin = rng.first;
258 orbMax = rng.second;
259 }
260 if (orbMin < 0) {
261 orbMin = 0;
262 }
263 if (orbMax < 0) {
264 orbMax = 0;
265 }
266 if (timingInfo.runNumber > 523897) {
267 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
268 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
269 }
270 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
271 frames.emplace_back(InteractionRecord{0, uint32_t(orbMin)}, InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
272 }
273 mIRFrameSelector.setOwnList(frames, true);
274}
275
276void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
277{
278 std::ifstream inputFile(flname);
279 if (!inputFile) {
280 LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
281 }
282 std::string line;
283 size_t cntl = 0, cntr = 0;
284 while (std::getline(inputFile, line)) {
285 cntl++;
286 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
287 if (ch == ';' || ch == '\t' || ch == ',') {
288 ch = ' ';
289 }
290 }
292 if (line.size() < 1 || line[0] == '#') {
293 continue;
294 }
295 auto tokens = o2::utils::Str::tokenize(line, ' ');
296 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
297 if (tokens.size() >= 3) {
298 int run = 0;
299 long rmin, rmax;
300 try {
301 run = std::stoi(tokens[0]);
302 rmin = std::stol(tokens[1]);
303 rmax = std::stol(tokens[2]);
304 } catch (...) {
305 logError();
306 continue;
307 }
308
309 constexpr long ISTimeStamp = 1514761200000L;
310 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
311 if (rmin > rmax) {
312 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
313 }
314 if (mConvRunTimeRangesToOrbits == -1) {
315 if (convmn != convmx) {
316 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
317 }
318 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
319 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
320 } else {
321 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
322 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
323 }
324 }
325
326 mRunTimeRanges[run].emplace_back(rmin, rmax);
327 cntr++;
328 } else {
329 logError();
330 }
331 }
332 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
333 inputFile.close();
334}
335
337void CTFReaderSpec::openCTFFile(const std::string& flname)
338{
339 try {
340 mFilesRead++;
341 mCTFFile.reset(TFile::Open(flname.c_str()));
342 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
343 throw std::runtime_error(fmt::format("failed to open CTF file {}, skipping", flname));
344 }
345 mCTFTree.reset((TTree*)mCTFFile->Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()));
346 if (!mCTFTree) {
347 throw std::runtime_error(fmt::format("failed to load CTF tree from {}, skipping", flname));
348 }
349 if (mCTFTree->GetEntries() < 1) {
350 throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname));
351 }
352 if (mInput.shuffle) {
353 if (mInput.ctfIDs.empty()) {
354 int entries = mCTFTree->GetEntries();
355 if (mInput.maxTFs > 0) {
356 entries = std::min(entries, mInput.maxTFs);
357 }
358 if (mInput.maxTFsPerFile > 0) {
359 entries = std::min(entries, mInput.maxTFsPerFile);
360 }
361 mInput.ctfIDs.clear();
362 mInput.ctfIDs.resize(entries);
363 std::iota(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), 0);
364 }
365 std::random_device dev;
366 std::mt19937 gen{dev()};
367 std::shuffle(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), gen);
368 LOGP(info, "will shuffle reading of CTF entries in this order:");
369 for (int i{0}; i < (int)mInput.ctfIDs.size(); ++i) {
370 LOGP(info, "\tTF {:02} -> {:02}", i, mInput.ctfIDs[i]);
371 }
372 }
373 } catch (const std::exception& e) {
374 LOG(error) << "Cannot process " << flname << ", reason: " << e.what();
375 mCTFTree.reset();
376 mCTFFile.reset();
377 mNFailedFiles++;
378 if (mFileFetcher) {
379 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
380 }
381 }
382 mCurrTreeEntry = 0;
383}
384
387{
388 if (mInput.tfRateLimit == -999) {
389 mInput.tfRateLimit = std::stoi(pc.services().get<RawDeviceService>().device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
390 }
391 std::string tfFileName;
392 bool waitAcknowledged = false;
393 long startWait = 0;
394
395 while (mRunning) {
396 if (mCTFTree) { // there is a tree open with multiple CTF
397 if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter || mInput.shuffle || mInput.reverseCTFIDs) { // no selection requested or matching CTF ID is found
398 LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops();
399 if (mInput.shuffle || mInput.reverseCTFIDs) {
400 mCurrTreeEntry = mInput.ctfIDs[mSelIDEntry];
401 }
402 mSelIDEntry++;
403 if (processTF(pc)) {
404 break;
405 }
406 }
407 // explict CTF ID selection list or IRFrame was provided and current entry is not selected
408 LOGP(info, "Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
409 checkTreeEntries();
410 mCTFCounter++;
411 continue;
412 }
413 //
414 tfFileName = mFileFetcher->getNextFileInQueue();
415 if (tfFileName.empty()) {
416 if (!mFileFetcher->isRunning()) { // nothing expected in the queue
417 mRunning = false;
418 break;
419 }
420 if (!waitAcknowledged) {
421 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
422 waitAcknowledged = true;
423 }
424 pc.services().get<RawDeviceService>().waitFor(5);
425 continue;
426 }
427 if (waitAcknowledged) {
428 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
429 mTotalWaitTime += waitTime;
430 if (++mNWaits > 1) {
431 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
432 }
433 waitAcknowledged = false;
434 }
435 LOG(info) << "Reading CTF input " << ' ' << tfFileName;
436 openCTFFile(tfFileName);
437 }
438
439 if (mCTFCounter >= mInput.maxTFs || (!mInput.ctfIDs.empty() && mSelIDEntry >= mInput.ctfIDs.size())) { // done
440 LOGP(info, "All CTFs from selected range were injected, stopping");
441 mRunning = false;
442 } else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) { // previous tree was done, can we read more?
443 mRunning = false;
444 }
445
446 if (!mRunning) {
448 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
449 stopReader();
450 const std::string dummy{"ctf_read_ntf.txt"};
451 if (mCTFCounterAcc == 0) {
452 LOGP(warn, "No TF passed selection, writing a 0 to file {}", dummy);
453 }
454 try {
455 std::ofstream outfile;
456 outfile.open(dummy, std::ios::out | std::ios::trunc);
457 outfile << mCTFCounterAcc << std::endl;
458 } catch (...) {
459 LOGP(error, "Failed to write {}", dummy);
460 }
461 }
462}
463
465bool CTFReaderSpec::processTF(ProcessingContext& pc)
466{
467 auto cput = mTimer.CpuTime();
468 mTimer.Start(false);
469
470 static RateLimiter limiter;
471 CTFHeader ctfHeader;
472 if (!readFromTree(*(mCTFTree.get()), "CTFHeader", ctfHeader, mCurrTreeEntry)) {
473 throw std::runtime_error("did not find CTFHeader");
474 }
475 if (mImposeRunStartMS > 0) {
476 ctfHeader.creationTime = mImposeRunStartMS + ctfHeader.firstTForbit * o2::constants::lhc::LHCOrbitMUS * 1e-3;
477 }
478 if (ctfHeader.creationTime == 0) { // try to repair header with ad hoc data
479 tryToFixCTFHeader(ctfHeader);
480 }
481
482 if (mUseLocalTFCounter) {
483 ctfHeader.tfCounter = mCTFCounterAcc;
484 }
485
486 LOG(info) << ctfHeader;
487
488 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
489 timingInfo.firstTForbit = ctfHeader.firstTForbit;
490 timingInfo.creation = ctfHeader.creationTime;
491 timingInfo.tfCounter = ctfHeader.tfCounter;
492 timingInfo.runNumber = ctfHeader.run;
493
494 if (mRunTimeRanges.size() && timingInfo.runNumber != mRunNumberPrev) {
495 runTimeRangesToIRFrameSelector(timingInfo);
496 }
497 mRunNumberPrev = timingInfo.runNumber;
498 gsl::span<const o2::dataformats::IRFrame> irSpan{};
499 if (mIRFrameSelector.isSet()) {
501 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, timingInfo.firstTForbit < 0xffffffff - (mTFLength - 1) ? timingInfo.firstTForbit + (mTFLength - 1) : 0xffffffff);
502 irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
503 bool acc = true;
504 if (mInput.skipSkimmedOutTF) {
505 acc = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
506 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
507 irSpan.size(), ir0.asString(), ir1.asString(), acc ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
508 }
509 if (!acc) {
510 return false;
511 }
512 if (mInput.checkTFLimitBeforeReading) {
513 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
514 }
515 } else {
516 if (mInput.checkTFLimitBeforeReading) {
517 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
518 }
519 }
520 if (mIFRamesOut) {
521 auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
522 }
523 // send CTF Header
524 pc.outputs().snapshot({"header", mInput.subspec}, ctfHeader);
525
528 processDetector<o2::emcal::CTF>(DetID::EMC, ctfHeader, pc);
529 processDetector<o2::hmpid::CTF>(DetID::HMP, ctfHeader, pc);
530 processDetector<o2::phos::CTF>(DetID::PHS, ctfHeader, pc);
531 processDetector<o2::tpc::CTF>(DetID::TPC, ctfHeader, pc);
532 processDetector<o2::trd::CTF>(DetID::TRD, ctfHeader, pc);
533 processDetector<o2::ft0::CTF>(DetID::FT0, ctfHeader, pc);
534 processDetector<o2::fv0::CTF>(DetID::FV0, ctfHeader, pc);
535 processDetector<o2::fdd::CTF>(DetID::FDD, ctfHeader, pc);
536 processDetector<o2::tof::CTF>(DetID::TOF, ctfHeader, pc);
537 processDetector<o2::mid::CTF>(DetID::MID, ctfHeader, pc);
538 processDetector<o2::mch::CTF>(DetID::MCH, ctfHeader, pc);
539 processDetector<o2::cpv::CTF>(DetID::CPV, ctfHeader, pc);
540 processDetector<o2::zdc::CTF>(DetID::ZDC, ctfHeader, pc);
541 processDetector<o2::ctp::CTF>(DetID::CTP, ctfHeader, pc);
542 mCTFCounterAcc++;
543
544 // send sTF acknowledge message
545 if (!mInput.sup0xccdb) {
546 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(OutputRef{"TFDist", 0xccdb});
547 stfDist.id = uint64_t(mCurrTreeEntry);
548 stfDist.firstOrbit = ctfHeader.firstTForbit;
549 stfDist.runNumber = uint32_t(ctfHeader.run);
550 }
551
552 auto entryStr = fmt::format("({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
553 checkTreeEntries();
554 mTimer.Stop();
555
556 // do we need to wait to respect the delay ?
557 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
558 if (mCTFCounter) {
559 auto tDiff = tNow - mLastSendTime;
560 if (tDiff < mInput.delay_us) {
561 pc.services().get<RawDeviceService>().waitFor((mInput.delay_us - tDiff) / 1000); // respect requested delay before sending
562 }
563 }
564 if (!mInput.checkTFLimitBeforeReading) {
565 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
566 }
567 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
568 LOGP(info, "Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
569 mLastSendTime = tNow;
570 mCTFCounter++;
571 return true;
572}
573
575void CTFReaderSpec::checkTreeEntries()
576{
577 bool reachedEnd{false};
578 if (mInput.shuffle || mInput.reverseCTFIDs) { // last entry is last id
579 reachedEnd = (mCurrTreeEntry == mInput.ctfIDs.back());
580 } else { // check if the tree has entries left, if needed, close current tree/file
581 reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
582 }
583 if (reachedEnd || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
584 mCTFTree.reset();
585 mCTFFile->Close();
586 mCTFFile.reset();
587 if (mFileFetcher) {
588 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
589 }
590 }
591}
592
594void CTFReaderSpec::setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const
595{
596 auto* stack = pc.outputs().findMessageHeaderStack(OutputRef{lbl, subspec});
597 if (!stack) {
598 throw std::runtime_error(fmt::format("failed to find output message header stack for {}", lbl));
599 }
600 auto dh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(stack));
601 dh->firstTForbit = ctfHeader.firstTForbit;
602 dh->tfCounter = ctfHeader.tfCounter;
603 dh->runNumber = uint32_t(ctfHeader.run);
604 auto dph = const_cast<o2::framework::DataProcessingHeader*>(o2::header::get<o2::framework::DataProcessingHeader*>(stack));
605 dph->creation = ctfHeader.creationTime;
606}
607
609void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader) const
610{
611 // HACK: fix CTFHeader for the pilot beam runs, where the TF creation time was not recorded
612 struct RunStartData {
613 uint32_t run = 0;
614 uint32_t firstTForbit = 0;
615 uint64_t tstampMS0 = 0;
616 };
617 const std::vector<RunStartData> tf0Data{
618 {505207, 133875, 1635322620830},
619 {505217, 14225007, 1635328375618},
620 {505278, 1349340, 1635376882079},
621 {505285, 1488862, 1635378517248},
622 {505303, 2615411, 1635392586314},
623 {505397, 5093945, 1635454778123},
624 {505404, 19196217, 1635456032855},
625 {505405, 28537913, 1635456862913},
626 {505406, 41107641, 1635457980628},
627 {505413, 452530, 1635460562613},
628 {505440, 13320708, 1635472436927},
629 {505443, 26546564, 1635473613239},
630 {505446, 177711, 1635477270241},
631 {505548, 88037114, 1635544414050},
632 {505582, 295044346, 1635562822389},
633 {505600, 417241082, 1635573688564},
634 {505623, 10445984, 1635621310460},
635 {505629, 126979, 1635623289756},
636 {505637, 338969, 1635630909893},
637 {505645, 188222, 1635634560881},
638 {505658, 81044, 1635645404694},
639 {505669, 328291, 1635657807147},
640 {505673, 30988, 1635659148972},
641 {505713, 620506, 1635725054798},
642 {505720, 5359903, 1635730673978}};
643 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
644 for (const auto& tf0 : tf0Data) {
645 if (ctfHeader.run == tf0.run) {
646 ctfHeader.creationTime = tf0.tstampMS0;
647 int64_t offset = std::ceil((ctfHeader.firstTForbit - tf0.firstTForbit) * o2::constants::lhc::LHCOrbitMUS * 1e-3);
648 ctfHeader.creationTime += offset > 0 ? offset : 0;
649 break;
650 }
651 }
652 }
653}
654
657{
658 std::vector<InputSpec> inputs;
659 std::vector<OutputSpec> outputs;
660 std::vector<ConfigParamSpec> options;
661
662 outputs.emplace_back(OutputLabel{"header"}, "CTF", "HEADER", inp.subspec, Lifetime::Timeframe);
663 for (auto id = DetID::First; id <= DetID::Last; id++) {
664 if (inp.detMask[id]) {
665 DetID det(id);
666 if (det == DetID::ITS) {
668 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
669 outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec * 100 + iLayer, Lifetime::Timeframe);
670 }
671 } else if (det == DetID::MFT) {
673 for (uint32_t iLayer = 0; iLayer < nLayers; iLayer++) {
674 outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec * 100 + iLayer, Lifetime::Timeframe);
675 }
676 } else {
677 outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec, Lifetime::Timeframe);
678 }
679 }
680 }
681 if (!inp.fileIRFrames.empty() || !inp.fileRunTimeSpans.empty()) {
682 outputs.emplace_back(OutputLabel{"selIRFrames"}, "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
683 }
684 if (!inp.sup0xccdb) {
685 outputs.emplace_back(OutputSpec{{"TFDist"}, o2::header::gDataOriginFLP, o2::header::gDataDescriptionDISTSTF, 0xccdb});
686 }
687 options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
688 options.emplace_back(ConfigParamSpec{"reverse-select-ctf-ids", VariantType::Bool, false, {"reverse order of to inject CTF IDs"}});
689 options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
690 options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
691 options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
692 options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
693 options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
694 options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
695
696 if (!inp.metricChannel.empty()) {
697 options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
698 }
699
700 return DataProcessorSpec{
701 "ctf-reader",
702 inputs,
703 outputs,
704 AlgorithmSpec{adaptFromTask<CTFReaderSpec>(inp)},
705 options};
706}
707
708} // namespace ctf
709} // namespace o2
Definitions for CPV CTF data.
Header for CTF collection.
Definitions for CTP CTF data.
default_random_engine gen(dev())
random_device dev
Definitions for EMC CTF data.
std::ostringstream debug
Definitions for FDD CTF data.
Definitions for FT0 CTF data.
Definitions for FV0 CTF data.
int32_t i
Definitions for HMPID CTF data.
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Definitions for ITS/MFT CTF data.
Header to collect LHC related constants.
Definitions for MCH CTF data.
Definitions for MID CTF data.
Definition of the Names Generator class.
Definitions for PHOS CTF data.
Helper function to tokenize sequences and ranges of integral numbers.
uint32_t stack
Definition RawData.h:1
Definitions for TOF CTF data.
Definitions for TPC CTF data.
Definitions for TRD CTF data.
TBranch * ptr
Definitions for ZDC CTF data.
static constexpr std::string_view CTFTREENAME
Definition NameConf.h:95
static BasicCCDBManager & instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
void init(o2::framework::InitContext &ic) final
void readFromTree(TTree &tree, const std::string &name, int ev=0)
read from tree to non-flat object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:146
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:95
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLintptr offset
Definition glcorearb.h:660
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
void CTFReaderSpec::processDetector< o2::itsmft::CTF >(DetID det, const CTFHeader &ctfHeader, ProcessingContext &pc) const
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
int32_t const char int32_t line
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
uint32_t firstTForbit
Definition CTFHeader.h:31
uint32_t tfCounter
Definition CTFHeader.h:32
o2::detectors::DetID::mask_t detectors
Definition CTFHeader.h:33
uint64_t creationTime
Definition CTFHeader.h:30
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
the main header struct
Definition DataHeader.h:620
TForbitType firstTForbit
Definition DataHeader.h:676
wrapper for the Entropy-encoded clusters of the TF
Definition CTF.h:71
static constexpr int getNLayers()
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec, const o2::parameters::GRPLHCIFData *grplhcif=nullptr)
static void trim(std::string &s)
Definition StringUtils.h:70
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)