Project
Loading...
Searching...
No Matches
CTFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <random>
15#include <vector>
16#include <algorithm>
17#include <numeric>
18
19#include <TFile.h>
20#include <TTree.h>
21#include <TStopwatch.h>
22
23#include "Framework/Logger.h"
26#include "Framework/InputSpec.h"
37#include "Headers/STFHeader.h"
39#include "DataFormatsTPC/CTF.h"
40#include "DataFormatsTRD/CTF.h"
41#include "DataFormatsFT0/CTF.h"
42#include "DataFormatsFV0/CTF.h"
43#include "DataFormatsFDD/CTF.h"
44#include "DataFormatsTOF/CTF.h"
45#include "DataFormatsMID/CTF.h"
46#include "DataFormatsMCH/CTF.h"
48#include "DataFormatsPHOS/CTF.h"
49#include "DataFormatsCPV/CTF.h"
50#include "DataFormatsZDC/CTF.h"
51#include "DataFormatsHMP/CTF.h"
52#include "DataFormatsCTP/CTF.h"
57#include <fairmq/Device.h>
58
59using namespace o2::framework;
60
61namespace o2
62{
63namespace ctf
64{
65
66template <typename T>
67bool readFromTree(TTree& tree, const std::string brname, T& dest, int ev = 0)
68{
69 auto* br = tree.GetBranch(brname.c_str());
70 if (br && br->GetEntries() > ev) {
71 auto* ptr = &dest;
72 br->SetAddress(&ptr);
73 br->GetEntry(ev);
74 br->ResetAddress();
75 return true;
76 }
77 return false;
78}
79
81
83{
84 public:
85 CTFReaderSpec(const CTFReaderInp& inp);
86 ~CTFReaderSpec() override;
87 void init(o2::framework::InitContext& ic) final;
89
90 private:
91 void runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo);
92 void loadRunTimeSpans(const std::string& flname);
93 void openCTFFile(const std::string& flname);
94 bool processTF(ProcessingContext& pc);
95 void checkTreeEntries();
96 void stopReader();
97 template <typename C>
98 void processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const;
99 void setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const; // keep just for the reference
100 void tryToFixCTFHeader(CTFHeader& ctfHeader) const;
101 CTFReaderInp mInput{};
102 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
103 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
104 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
105 std::unique_ptr<TFile> mCTFFile;
106 std::unique_ptr<TTree> mCTFTree;
107 bool mRunning = false;
108 bool mUseLocalTFCounter = false;
109 bool mIFRamesOut = false;
110 int mConvRunTimeRangesToOrbits = -1; // not defined yet
111 int mCTFCounter = 0;
112 int mCTFCounterAcc = 0;
113 int mNFailedFiles = 0;
114 int mFilesRead = 0;
115 int mTFLength = 128;
116 int mNWaits = 0;
117 int mRunNumberPrev = -1;
118 long mTotalWaitTime = 0;
119 long mLastSendTime = 0L;
120 long mCurrTreeEntry = 0L;
121 long mImposeRunStartMS = 0L;
122 size_t mSelIDEntry = 0; // next CTFID to select from the mInput.ctfIDs (if non-empty)
123 TStopwatch mTimer;
124};
125
128{
129 mTimer.Stop();
130 mTimer.Reset();
131}
132
135{
136 stopReader();
137}
138
140void CTFReaderSpec::stopReader()
141{
142 if (!mFileFetcher) {
143 return;
144 }
145 LOGP(info, "CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
146 LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
147 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
148 mRunning = false;
149 mFileFetcher->stop();
150 mFileFetcher.reset();
151 mCTFTree.reset();
152 if (mCTFFile) {
153 mCTFFile->Close();
154 }
155 mCTFFile.reset();
156}
157
160{
161 mInput.ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-ctf-ids"));
162 if ((mInput.reverseCTFIDs = ic.options().get<bool>("reverse-select-ctf-ids"))) {
163 std::reverse(mInput.ctfIDs.begin(), mInput.ctfIDs.end());
164 }
165 mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
166 mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
167 mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
168 mInput.maxTFs = ic.options().get<int>("max-tf");
169 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
170 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
171 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
172 mRunning = true;
173 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
174 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
175 mFileFetcher->setMaxLoops(mInput.maxLoops);
176 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
177 mFileFetcher->start();
178 if (!mInput.fileIRFrames.empty()) {
179 mIRFrameSelector.loadIRFrames(mInput.fileIRFrames);
180 const auto& hbfu = o2::raw::HBFUtils::Instance();
181 mTFLength = hbfu.nHBFPerTF;
182 LOGP(info, "IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.fileIRFrames, mTFLength);
183 mIFRamesOut = true;
184 }
185 if (!mInput.fileRunTimeSpans.empty()) {
186 loadRunTimeSpans(mInput.fileRunTimeSpans);
187 mIFRamesOut = true;
188 }
189}
190
191void CTFReaderSpec::runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo)
192{
193 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
194 mIRFrameSelector.clear();
195 auto ent = mRunTimeRanges.find(timingInfo.runNumber);
196 if (ent == mRunTimeRanges.end()) {
197 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.runNumber);
198 return;
199 }
203 if (rinfo.runNumber != timingInfo.runNumber || rinfo.orbitsPerTF < 1) {
204 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", timingInfo.runNumber);
205 }
206 mTFLength = rinfo.orbitsPerTF;
207 std::vector<o2::dataformats::IRFrame> frames;
208 for (const auto& rng : ent->second) {
209 long orbMin = 0, orbMax = 0;
210 if (mConvRunTimeRangesToOrbits > 0) {
211 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
212 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
213 } else {
214 orbMin = rng.first;
215 orbMax = rng.second;
216 }
217 if (orbMin < 0) {
218 orbMin = 0;
219 }
220 if (orbMax < 0) {
221 orbMax = 0;
222 }
223 if (timingInfo.runNumber > 523897) {
224 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
225 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
226 }
227 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
228 frames.emplace_back(InteractionRecord{0, uint32_t(orbMin)}, InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
229 }
230 mIRFrameSelector.setOwnList(frames, true);
231}
232
233void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
234{
235 std::ifstream inputFile(flname);
236 if (!inputFile) {
237 LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
238 }
239 std::string line;
240 size_t cntl = 0, cntr = 0;
241 while (std::getline(inputFile, line)) {
242 cntl++;
243 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
244 if (ch == ';' || ch == '\t' || ch == ',') {
245 ch = ' ';
246 }
247 }
249 if (line.size() < 1 || line[0] == '#') {
250 continue;
251 }
252 auto tokens = o2::utils::Str::tokenize(line, ' ');
253 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
254 if (tokens.size() >= 3) {
255 int run = 0;
256 long rmin, rmax;
257 try {
258 run = std::stoi(tokens[0]);
259 rmin = std::stol(tokens[1]);
260 rmax = std::stol(tokens[2]);
261 } catch (...) {
262 logError();
263 continue;
264 }
265
266 constexpr long ISTimeStamp = 1514761200000L;
267 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
268 if (rmin > rmax) {
269 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
270 }
271 if (mConvRunTimeRangesToOrbits == -1) {
272 if (convmn != convmx) {
273 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
274 }
275 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
276 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
277 } else {
278 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
279 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
280 }
281 }
282
283 mRunTimeRanges[run].emplace_back(rmin, rmax);
284 cntr++;
285 } else {
286 logError();
287 }
288 }
289 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
290 inputFile.close();
291}
292
294void CTFReaderSpec::openCTFFile(const std::string& flname)
295{
296 try {
297 mFilesRead++;
298 mCTFFile.reset(TFile::Open(flname.c_str()));
299 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
300 throw std::runtime_error(fmt::format("failed to open CTF file {}, skipping", flname));
301 }
302 mCTFTree.reset((TTree*)mCTFFile->Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()));
303 if (!mCTFTree) {
304 throw std::runtime_error(fmt::format("failed to load CTF tree from {}, skipping", flname));
305 }
306 if (mCTFTree->GetEntries() < 1) {
307 throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname));
308 }
309 if (mInput.shuffle) {
310 if (mInput.ctfIDs.empty()) {
311 int entries = mCTFTree->GetEntries();
312 if (mInput.maxTFs > 0) {
313 entries = std::min(entries, mInput.maxTFs);
314 }
315 if (mInput.maxTFsPerFile > 0) {
316 entries = std::min(entries, mInput.maxTFsPerFile);
317 }
318 mInput.ctfIDs.clear();
319 mInput.ctfIDs.resize(entries);
320 std::iota(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), 0);
321 }
322 std::random_device dev;
323 std::mt19937 gen{dev()};
324 std::shuffle(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), gen);
325 LOGP(info, "will shuffle reading of CTF entries in this order:");
326 for (int i{0}; i < (int)mInput.ctfIDs.size(); ++i) {
327 LOGP(info, "\tTF {:02} -> {:02}", i, mInput.ctfIDs[i]);
328 }
329 }
330 } catch (const std::exception& e) {
331 LOG(error) << "Cannot process " << flname << ", reason: " << e.what();
332 mCTFTree.reset();
333 mCTFFile.reset();
334 mNFailedFiles++;
335 if (mFileFetcher) {
336 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
337 }
338 }
339 mCurrTreeEntry = 0;
340}
341
344{
345 if (mInput.tfRateLimit == -999) {
346 mInput.tfRateLimit = std::stoi(pc.services().get<RawDeviceService>().device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
347 }
348 std::string tfFileName;
349 bool waitAcknowledged = false;
350 long startWait = 0;
351
352 while (mRunning) {
353 if (mCTFTree) { // there is a tree open with multiple CTF
354 if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter || mInput.shuffle || mInput.reverseCTFIDs) { // no selection requested or matching CTF ID is found
355 LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops();
356 if (mInput.shuffle || mInput.reverseCTFIDs) {
357 mCurrTreeEntry = mInput.ctfIDs[mSelIDEntry];
358 }
359 mSelIDEntry++;
360 if (processTF(pc)) {
361 break;
362 }
363 }
364 // explict CTF ID selection list or IRFrame was provided and current entry is not selected
365 LOGP(info, "Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
366 checkTreeEntries();
367 mCTFCounter++;
368 continue;
369 }
370 //
371 tfFileName = mFileFetcher->getNextFileInQueue();
372 if (tfFileName.empty()) {
373 if (!mFileFetcher->isRunning()) { // nothing expected in the queue
374 mRunning = false;
375 break;
376 }
377 if (!waitAcknowledged) {
378 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
379 waitAcknowledged = true;
380 }
381 pc.services().get<RawDeviceService>().waitFor(5);
382 continue;
383 }
384 if (waitAcknowledged) {
385 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
386 mTotalWaitTime += waitTime;
387 if (++mNWaits > 1) {
388 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
389 }
390 waitAcknowledged = false;
391 }
392 LOG(info) << "Reading CTF input " << ' ' << tfFileName;
393 openCTFFile(tfFileName);
394 }
395
396 if (mCTFCounter >= mInput.maxTFs || (!mInput.ctfIDs.empty() && mSelIDEntry >= mInput.ctfIDs.size())) { // done
397 LOGP(info, "All CTFs from selected range were injected, stopping");
398 mRunning = false;
399 } else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) { // previous tree was done, can we read more?
400 mRunning = false;
401 }
402
403 if (!mRunning) {
405 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
406 stopReader();
407 const std::string dummy{"ctf_read_ntf.txt"};
408 if (mCTFCounterAcc == 0) {
409 LOGP(warn, "No TF passed selection, writing a 0 to file {}", dummy);
410 }
411 try {
412 std::ofstream outfile;
413 outfile.open(dummy, std::ios::out | std::ios::trunc);
414 outfile << mCTFCounterAcc << std::endl;
415 } catch (...) {
416 LOGP(error, "Failed to write {}", dummy);
417 }
418 }
419}
420
422bool CTFReaderSpec::processTF(ProcessingContext& pc)
423{
424 auto cput = mTimer.CpuTime();
425 mTimer.Start(false);
426
427 static RateLimiter limiter;
428 CTFHeader ctfHeader;
429 if (!readFromTree(*(mCTFTree.get()), "CTFHeader", ctfHeader, mCurrTreeEntry)) {
430 throw std::runtime_error("did not find CTFHeader");
431 }
432 if (mImposeRunStartMS > 0) {
433 ctfHeader.creationTime = mImposeRunStartMS + ctfHeader.firstTForbit * o2::constants::lhc::LHCOrbitMUS * 1e-3;
434 }
435 if (ctfHeader.creationTime == 0) { // try to repair header with ad hoc data
436 tryToFixCTFHeader(ctfHeader);
437 }
438
439 if (mUseLocalTFCounter) {
440 ctfHeader.tfCounter = mCTFCounterAcc;
441 }
442
443 LOG(info) << ctfHeader;
444
445 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
446 timingInfo.firstTForbit = ctfHeader.firstTForbit;
447 timingInfo.creation = ctfHeader.creationTime;
448 timingInfo.tfCounter = ctfHeader.tfCounter;
449 timingInfo.runNumber = ctfHeader.run;
450
451 if (mRunTimeRanges.size() && timingInfo.runNumber != mRunNumberPrev) {
452 runTimeRangesToIRFrameSelector(timingInfo);
453 }
454 mRunNumberPrev = timingInfo.runNumber;
455 gsl::span<const o2::dataformats::IRFrame> irSpan{};
456 if (mIRFrameSelector.isSet()) {
458 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, timingInfo.firstTForbit < 0xffffffff - (mTFLength - 1) ? timingInfo.firstTForbit + (mTFLength - 1) : 0xffffffff);
459 irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
460 bool acc = true;
461 if (mInput.skipSkimmedOutTF) {
462 acc = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
463 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
464 irSpan.size(), ir0.asString(), ir1.asString(), acc ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
465 }
466 if (!acc) {
467 return false;
468 }
469 if (mInput.checkTFLimitBeforeReading) {
470 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
471 }
472 } else {
473 if (mInput.checkTFLimitBeforeReading) {
474 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
475 }
476 }
477 if (mIFRamesOut) {
478 auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
479 }
480 // send CTF Header
481 pc.outputs().snapshot({"header", mInput.subspec}, ctfHeader);
482
483 processDetector<o2::itsmft::CTF>(DetID::ITS, ctfHeader, pc);
484 processDetector<o2::itsmft::CTF>(DetID::MFT, ctfHeader, pc);
485 processDetector<o2::emcal::CTF>(DetID::EMC, ctfHeader, pc);
486 processDetector<o2::hmpid::CTF>(DetID::HMP, ctfHeader, pc);
487 processDetector<o2::phos::CTF>(DetID::PHS, ctfHeader, pc);
488 processDetector<o2::tpc::CTF>(DetID::TPC, ctfHeader, pc);
489 processDetector<o2::trd::CTF>(DetID::TRD, ctfHeader, pc);
490 processDetector<o2::ft0::CTF>(DetID::FT0, ctfHeader, pc);
491 processDetector<o2::fv0::CTF>(DetID::FV0, ctfHeader, pc);
492 processDetector<o2::fdd::CTF>(DetID::FDD, ctfHeader, pc);
493 processDetector<o2::tof::CTF>(DetID::TOF, ctfHeader, pc);
494 processDetector<o2::mid::CTF>(DetID::MID, ctfHeader, pc);
495 processDetector<o2::mch::CTF>(DetID::MCH, ctfHeader, pc);
496 processDetector<o2::cpv::CTF>(DetID::CPV, ctfHeader, pc);
497 processDetector<o2::zdc::CTF>(DetID::ZDC, ctfHeader, pc);
498 processDetector<o2::ctp::CTF>(DetID::CTP, ctfHeader, pc);
499 mCTFCounterAcc++;
500
501 // send sTF acknowledge message
502 if (!mInput.sup0xccdb) {
503 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(OutputRef{"TFDist", 0xccdb});
504 stfDist.id = uint64_t(mCurrTreeEntry);
505 stfDist.firstOrbit = ctfHeader.firstTForbit;
506 stfDist.runNumber = uint32_t(ctfHeader.run);
507 }
508
509 auto entryStr = fmt::format("({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
510 checkTreeEntries();
511 mTimer.Stop();
512
513 // do we need to wait to respect the delay ?
514 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
515 if (mCTFCounter) {
516 auto tDiff = tNow - mLastSendTime;
517 if (tDiff < mInput.delay_us) {
518 pc.services().get<RawDeviceService>().waitFor((mInput.delay_us - tDiff) / 1000); // respect requested delay before sending
519 }
520 }
521 if (!mInput.checkTFLimitBeforeReading) {
522 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
523 }
524 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
525 LOGP(info, "Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
526 mLastSendTime = tNow;
527 mCTFCounter++;
528 return true;
529}
530
532void CTFReaderSpec::checkTreeEntries()
533{
534 bool reachedEnd{false};
535 if (mInput.shuffle || mInput.reverseCTFIDs) { // last entry is last id
536 reachedEnd = (mCurrTreeEntry == mInput.ctfIDs.back());
537 } else { // check if the tree has entries left, if needed, close current tree/file
538 reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries());
539 }
540 if (reachedEnd || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
541 mCTFTree.reset();
542 mCTFFile->Close();
543 mCTFFile.reset();
544 if (mFileFetcher) {
545 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
546 }
547 }
548}
549
551void CTFReaderSpec::setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const
552{
553 auto* stack = pc.outputs().findMessageHeaderStack(OutputRef{lbl, subspec});
554 if (!stack) {
555 throw std::runtime_error(fmt::format("failed to find output message header stack for {}", lbl));
556 }
557 auto dh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(stack));
558 dh->firstTForbit = ctfHeader.firstTForbit;
559 dh->tfCounter = ctfHeader.tfCounter;
560 dh->runNumber = uint32_t(ctfHeader.run);
561 auto dph = const_cast<o2::framework::DataProcessingHeader*>(o2::header::get<o2::framework::DataProcessingHeader*>(stack));
562 dph->creation = ctfHeader.creationTime;
563}
564
566template <typename C>
567void CTFReaderSpec::processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const
568{
569 if (mInput.detMask[det]) {
570 const auto lbl = det.getName();
571 auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({lbl, mInput.subspec}, ctfHeader.detectors[det] ? sizeof(C) : 0);
572 if (ctfHeader.detectors[det]) {
573 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
574 } else if (!mInput.allowMissingDetectors) {
575 throw std::runtime_error(fmt::format("Requested detector {} is missing in the CTF", lbl));
576 }
577 // setMessageHeader(pc, ctfHeader, lbl);
578 }
579}
580
582void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader) const
583{
584 // HACK: fix CTFHeader for the pilot beam runs, where the TF creation time was not recorded
585 struct RunStartData {
586 uint32_t run = 0;
587 uint32_t firstTForbit = 0;
588 uint64_t tstampMS0 = 0;
589 };
590 const std::vector<RunStartData> tf0Data{
591 {505207, 133875, 1635322620830},
592 {505217, 14225007, 1635328375618},
593 {505278, 1349340, 1635376882079},
594 {505285, 1488862, 1635378517248},
595 {505303, 2615411, 1635392586314},
596 {505397, 5093945, 1635454778123},
597 {505404, 19196217, 1635456032855},
598 {505405, 28537913, 1635456862913},
599 {505406, 41107641, 1635457980628},
600 {505413, 452530, 1635460562613},
601 {505440, 13320708, 1635472436927},
602 {505443, 26546564, 1635473613239},
603 {505446, 177711, 1635477270241},
604 {505548, 88037114, 1635544414050},
605 {505582, 295044346, 1635562822389},
606 {505600, 417241082, 1635573688564},
607 {505623, 10445984, 1635621310460},
608 {505629, 126979, 1635623289756},
609 {505637, 338969, 1635630909893},
610 {505645, 188222, 1635634560881},
611 {505658, 81044, 1635645404694},
612 {505669, 328291, 1635657807147},
613 {505673, 30988, 1635659148972},
614 {505713, 620506, 1635725054798},
615 {505720, 5359903, 1635730673978}};
616 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
617 for (const auto& tf0 : tf0Data) {
618 if (ctfHeader.run == tf0.run) {
619 ctfHeader.creationTime = tf0.tstampMS0;
620 int64_t offset = std::ceil((ctfHeader.firstTForbit - tf0.firstTForbit) * o2::constants::lhc::LHCOrbitMUS * 1e-3);
621 ctfHeader.creationTime += offset > 0 ? offset : 0;
622 break;
623 }
624 }
625 }
626}
627
630{
631 std::vector<InputSpec> inputs;
632 std::vector<OutputSpec> outputs;
633 std::vector<ConfigParamSpec> options;
634
635 outputs.emplace_back(OutputLabel{"header"}, "CTF", "HEADER", inp.subspec, Lifetime::Timeframe);
636 for (auto id = DetID::First; id <= DetID::Last; id++) {
637 if (inp.detMask[id]) {
638 DetID det(id);
639 outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec, Lifetime::Timeframe);
640 }
641 }
642 if (!inp.fileIRFrames.empty() || !inp.fileRunTimeSpans.empty()) {
643 outputs.emplace_back(OutputLabel{"selIRFrames"}, "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
644 }
645 if (!inp.sup0xccdb) {
646 outputs.emplace_back(OutputSpec{{"TFDist"}, o2::header::gDataOriginFLP, o2::header::gDataDescriptionDISTSTF, 0xccdb});
647 }
648
649 options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
650 options.emplace_back(ConfigParamSpec{"reverse-select-ctf-ids", VariantType::Bool, false, {"reverse order of to inject CTF IDs"}});
651 options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
652 options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
653 options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
654 options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
655 options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
656 options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
657
658 if (!inp.metricChannel.empty()) {
659 options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
660 }
661
662 return DataProcessorSpec{
663 "ctf-reader",
664 inputs,
665 outputs,
666 AlgorithmSpec{adaptFromTask<CTFReaderSpec>(inp)},
667 options};
668}
669
670} // namespace ctf
671} // namespace o2
Definitions for CPV CTF data.
Header for CTF collection.
Definitions for CTP CTF data.
default_random_engine gen(dev())
random_device dev
Definitions for EMC CTF data.
Definitions for FDD CTF data.
Definitions for FT0 CTF data.
Definitions for FV0 CTF data.
int32_t i
Definitions for HMPID CTF data.
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Definitions for ITS/MFT CTF data.
Header to collect LHC related constants.
Definitions for MCH CTF data.
Definitions for MID CTF data.
Definition of the Names Generator class.
Definitions for PHOS CTF data.
Helper function to tokenize sequences and ranges of integral numbers.
uint32_t stack
Definition RawData.h:1
Definitions for TOF CTF data.
Definitions for TPC CTF data.
Definitions for TRD CTF data.
TBranch * ptr
std::ostringstream debug
Definitions for ZDC CTF data.
static constexpr std::string_view CTFTREENAME
Definition NameConf.h:95
static BasicCCDBManager & instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
void init(o2::framework::InitContext &ic) final
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:94
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLintptr offset
Definition glcorearb.h:660
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
int32_t const char int32_t line
@ C
Definition Defs.h:36
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
uint32_t firstTForbit
Definition CTFHeader.h:31
uint32_t tfCounter
Definition CTFHeader.h:32
uint64_t creationTime
Definition CTFHeader.h:30
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
the main header struct
Definition DataHeader.h:618
TForbitType firstTForbit
Definition DataHeader.h:674
static AggregatedRunInfo buildAggregatedRunInfo(o2::ccdb::CCDBManagerInstance &ccdb, int runnumber)
static void trim(std::string &s)
Definition StringUtils.h:71
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)