Project
Loading...
Searching...
No Matches
CTFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <vector>
15#include <TFile.h>
16#include <TTree.h>
17
18#include "Framework/Logger.h"
21#include "Framework/InputSpec.h"
32#include "Headers/STFHeader.h"
34#include "DataFormatsTPC/CTF.h"
35#include "DataFormatsTRD/CTF.h"
36#include "DataFormatsFT0/CTF.h"
37#include "DataFormatsFV0/CTF.h"
38#include "DataFormatsFDD/CTF.h"
39#include "DataFormatsTOF/CTF.h"
40#include "DataFormatsMID/CTF.h"
41#include "DataFormatsMCH/CTF.h"
43#include "DataFormatsPHOS/CTF.h"
44#include "DataFormatsCPV/CTF.h"
45#include "DataFormatsZDC/CTF.h"
46#include "DataFormatsHMP/CTF.h"
47#include "DataFormatsCTP/CTF.h"
52#include <TStopwatch.h>
53#include <fairmq/Device.h>
54
55using namespace o2::framework;
56
57namespace o2
58{
59namespace ctf
60{
61
62template <typename T>
63bool readFromTree(TTree& tree, const std::string brname, T& dest, int ev = 0)
64{
65 auto* br = tree.GetBranch(brname.c_str());
66 if (br && br->GetEntries() > ev) {
67 auto* ptr = &dest;
68 br->SetAddress(&ptr);
69 br->GetEntry(ev);
70 br->ResetAddress();
71 return true;
72 }
73 return false;
74}
75
77
79{
80 public:
81 CTFReaderSpec(const CTFReaderInp& inp);
82 ~CTFReaderSpec() override;
83 void init(o2::framework::InitContext& ic) final;
85
86 private:
87 void runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo);
88 void loadRunTimeSpans(const std::string& flname);
89 void openCTFFile(const std::string& flname);
90 bool processTF(ProcessingContext& pc);
91 void checkTreeEntries();
92 void stopReader();
93 template <typename C>
94 void processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const;
95 void setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const; // keep just for the reference
96 void tryToFixCTFHeader(CTFHeader& ctfHeader) const;
97 CTFReaderInp mInput{};
98 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
99 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
100 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
101 std::unique_ptr<TFile> mCTFFile;
102 std::unique_ptr<TTree> mCTFTree;
103 bool mRunning = false;
104 bool mUseLocalTFCounter = false;
105 bool mIFRamesOut = false;
106 int mConvRunTimeRangesToOrbits = -1; // not defined yet
107 int mCTFCounter = 0;
108 int mCTFCounterAcc = 0;
109 int mNFailedFiles = 0;
110 int mFilesRead = 0;
111 int mTFLength = 128;
112 int mNWaits = 0;
113 int mRunNumberPrev = -1;
114 long mTotalWaitTime = 0;
115 long mLastSendTime = 0L;
116 long mCurrTreeEntry = 0L;
117 long mImposeRunStartMS = 0L;
118 size_t mSelIDEntry = 0; // next CTFID to select from the mInput.ctfIDs (if non-empty)
119 TStopwatch mTimer;
120};
121
124{
125 mTimer.Stop();
126 mTimer.Reset();
127}
128
131{
132 stopReader();
133}
134
136void CTFReaderSpec::stopReader()
137{
138 if (!mFileFetcher) {
139 return;
140 }
141 LOGP(info, "CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
142 LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
143 mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
144 mRunning = false;
145 mFileFetcher->stop();
146 mFileFetcher.reset();
147 mCTFTree.reset();
148 if (mCTFFile) {
149 mCTFFile->Close();
150 }
151 mCTFFile.reset();
152}
153
156{
157 mInput.ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-ctf-ids"));
158 mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
159 mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
160 mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
161 mInput.maxTFs = ic.options().get<int>("max-tf");
162 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
163 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
164 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
165 mRunning = true;
166 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
167 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
168 mFileFetcher->setMaxLoops(mInput.maxLoops);
169 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
170 mFileFetcher->start();
171 if (!mInput.fileIRFrames.empty()) {
172 mIRFrameSelector.loadIRFrames(mInput.fileIRFrames);
173 const auto& hbfu = o2::raw::HBFUtils::Instance();
174 mTFLength = hbfu.nHBFPerTF;
175 LOGP(info, "IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.fileIRFrames, mTFLength);
176 mIFRamesOut = true;
177 }
178 if (!mInput.fileRunTimeSpans.empty()) {
179 loadRunTimeSpans(mInput.fileRunTimeSpans);
180 mIFRamesOut = true;
181 }
182}
183
184void CTFReaderSpec::runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo)
185{
186 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
187 mIRFrameSelector.clear();
188 auto ent = mRunTimeRanges.find(timingInfo.runNumber);
189 if (ent == mRunTimeRanges.end()) {
190 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.runNumber);
191 return;
192 }
196 if (rinfo.runNumber != timingInfo.runNumber || rinfo.orbitsPerTF < 1) {
197 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", timingInfo.runNumber);
198 }
199 mTFLength = rinfo.orbitsPerTF;
200 std::vector<o2::dataformats::IRFrame> frames;
201 for (const auto& rng : ent->second) {
202 long orbMin = 0, orbMax = 0;
203 if (mConvRunTimeRangesToOrbits > 0) {
204 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
205 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
206 } else {
207 orbMin = rng.first;
208 orbMax = rng.second;
209 }
210 if (orbMin < 0) {
211 orbMin = 0;
212 }
213 if (orbMax < 0) {
214 orbMax = 0;
215 }
216 if (timingInfo.runNumber > 523897) {
217 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
218 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
219 }
220 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
221 frames.emplace_back(InteractionRecord{0, uint32_t(orbMin)}, InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
222 }
223 mIRFrameSelector.setOwnList(frames, true);
224}
225
226void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
227{
228 std::ifstream inputFile(flname);
229 if (!inputFile) {
230 LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
231 }
232 std::string line;
233 size_t cntl = 0, cntr = 0;
234 while (std::getline(inputFile, line)) {
235 cntl++;
236 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
237 if (ch == ';' || ch == '\t' || ch == ',') {
238 ch = ' ';
239 }
240 }
242 if (line.size() < 1 || line[0] == '#') {
243 continue;
244 }
245 auto tokens = o2::utils::Str::tokenize(line, ' ');
246 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
247 if (tokens.size() >= 3) {
248 int run = 0;
249 long rmin, rmax;
250 try {
251 run = std::stoi(tokens[0]);
252 rmin = std::stol(tokens[1]);
253 rmax = std::stol(tokens[2]);
254 } catch (...) {
255 logError();
256 continue;
257 }
258
259 constexpr long ISTimeStamp = 1514761200000L;
260 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
261 if (rmin > rmax) {
262 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
263 }
264 if (mConvRunTimeRangesToOrbits == -1) {
265 if (convmn != convmx) {
266 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
267 }
268 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
269 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
270 } else {
271 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
272 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
273 }
274 }
275
276 mRunTimeRanges[run].emplace_back(rmin, rmax);
277 cntr++;
278 } else {
279 logError();
280 }
281 }
282 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
283 inputFile.close();
284}
285
287void CTFReaderSpec::openCTFFile(const std::string& flname)
288{
289 try {
290 mFilesRead++;
291 mCTFFile.reset(TFile::Open(flname.c_str()));
292 if (!mCTFFile || !mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
293 throw std::runtime_error(fmt::format("failed to open CTF file {}, skipping", flname));
294 }
295 mCTFTree.reset((TTree*)mCTFFile->Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()));
296 if (!mCTFTree) {
297 throw std::runtime_error(fmt::format("failed to load CTF tree from {}, skipping", flname));
298 }
299 if (mCTFTree->GetEntries() < 1) {
300 throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname));
301 }
302 } catch (const std::exception& e) {
303 LOG(error) << "Cannot process " << flname << ", reason: " << e.what();
304 mCTFTree.reset();
305 mCTFFile.reset();
306 mNFailedFiles++;
307 if (mFileFetcher) {
308 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
309 }
310 }
311 mCurrTreeEntry = 0;
312}
313
316{
317 if (mInput.tfRateLimit == -999) {
318 mInput.tfRateLimit = std::stoi(pc.services().get<RawDeviceService>().device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
319 }
320 std::string tfFileName;
321 bool waitAcknowledged = false;
322 long startWait = 0;
323
324 while (mRunning) {
325 if (mCTFTree) { // there is a tree open with multiple CTF
326 if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter) { // no selection requested or matching CTF ID is found
327 LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops();
328 mSelIDEntry++;
329 if (processTF(pc)) {
330 break;
331 }
332 }
333 // explict CTF ID selection list or IRFrame was provided and current entry is not selected
334 LOGP(info, "Skipping CTF#{} ({} of {} in {})", mCTFCounter, mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
335 checkTreeEntries();
336 mCTFCounter++;
337 continue;
338 }
339 //
340 tfFileName = mFileFetcher->getNextFileInQueue();
341 if (tfFileName.empty()) {
342 if (!mFileFetcher->isRunning()) { // nothing expected in the queue
343 mRunning = false;
344 break;
345 }
346 if (!waitAcknowledged) {
347 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
348 waitAcknowledged = true;
349 }
350 pc.services().get<RawDeviceService>().waitFor(5);
351 continue;
352 }
353 if (waitAcknowledged) {
354 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
355 mTotalWaitTime += waitTime;
356 if (++mNWaits > 1) {
357 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
358 }
359 waitAcknowledged = false;
360 }
361 LOG(info) << "Reading CTF input " << ' ' << tfFileName;
362 openCTFFile(tfFileName);
363 }
364
365 if (mCTFCounter >= mInput.maxTFs || (!mInput.ctfIDs.empty() && mSelIDEntry >= mInput.ctfIDs.size())) { // done
366 LOGP(info, "All CTFs from selected range were injected, stopping");
367 mRunning = false;
368 } else if (mRunning && !mCTFTree && mFileFetcher->getNextFileInQueue().empty() && !mFileFetcher->isRunning()) { // previous tree was done, can we read more?
369 mRunning = false;
370 }
371
372 if (!mRunning) {
374 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
375 stopReader();
376 const std::string dummy{"ctf_read_ntf.txt"};
377 if (mCTFCounterAcc == 0) {
378 LOGP(warn, "No TF passed selection, writing a 0 to file {}", dummy);
379 }
380 try {
381 std::ofstream outfile;
382 outfile.open(dummy, std::ios::out | std::ios::trunc);
383 outfile << mCTFCounterAcc << std::endl;
384 } catch (...) {
385 LOGP(error, "Failed to write {}", dummy);
386 }
387 }
388}
389
391bool CTFReaderSpec::processTF(ProcessingContext& pc)
392{
393 auto cput = mTimer.CpuTime();
394 mTimer.Start(false);
395
396 static RateLimiter limiter;
397 CTFHeader ctfHeader;
398 if (!readFromTree(*(mCTFTree.get()), "CTFHeader", ctfHeader, mCurrTreeEntry)) {
399 throw std::runtime_error("did not find CTFHeader");
400 }
401 if (mImposeRunStartMS > 0) {
402 ctfHeader.creationTime = mImposeRunStartMS + ctfHeader.firstTForbit * o2::constants::lhc::LHCOrbitMUS * 1e-3;
403 }
404 if (ctfHeader.creationTime == 0) { // try to repair header with ad hoc data
405 tryToFixCTFHeader(ctfHeader);
406 }
407
408 if (mUseLocalTFCounter) {
409 ctfHeader.tfCounter = mCTFCounterAcc;
410 }
411
412 LOG(info) << ctfHeader;
413
414 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
415 timingInfo.firstTForbit = ctfHeader.firstTForbit;
416 timingInfo.creation = ctfHeader.creationTime;
417 timingInfo.tfCounter = ctfHeader.tfCounter;
418 timingInfo.runNumber = ctfHeader.run;
419
420 if (mRunTimeRanges.size() && timingInfo.runNumber != mRunNumberPrev) {
421 runTimeRangesToIRFrameSelector(timingInfo);
422 }
423 mRunNumberPrev = timingInfo.runNumber;
424 gsl::span<const o2::dataformats::IRFrame> irSpan{};
425 if (mIRFrameSelector.isSet()) {
427 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, timingInfo.firstTForbit < 0xffffffff - (mTFLength - 1) ? timingInfo.firstTForbit + (mTFLength - 1) : 0xffffffff);
428 irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
429 bool acc = true;
430 if (mInput.skipSkimmedOutTF) {
431 acc = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
432 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
433 irSpan.size(), ir0.asString(), ir1.asString(), acc ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
434 }
435 if (!acc) {
436 return false;
437 }
438 if (mInput.checkTFLimitBeforeReading) {
439 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
440 }
441 } else {
442 if (mInput.checkTFLimitBeforeReading) {
443 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
444 }
445 }
446 if (mIFRamesOut) {
447 auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
448 }
449 // send CTF Header
450 pc.outputs().snapshot({"header", mInput.subspec}, ctfHeader);
451
452 processDetector<o2::itsmft::CTF>(DetID::ITS, ctfHeader, pc);
453 processDetector<o2::itsmft::CTF>(DetID::MFT, ctfHeader, pc);
454 processDetector<o2::emcal::CTF>(DetID::EMC, ctfHeader, pc);
455 processDetector<o2::hmpid::CTF>(DetID::HMP, ctfHeader, pc);
456 processDetector<o2::phos::CTF>(DetID::PHS, ctfHeader, pc);
457 processDetector<o2::tpc::CTF>(DetID::TPC, ctfHeader, pc);
458 processDetector<o2::trd::CTF>(DetID::TRD, ctfHeader, pc);
459 processDetector<o2::ft0::CTF>(DetID::FT0, ctfHeader, pc);
460 processDetector<o2::fv0::CTF>(DetID::FV0, ctfHeader, pc);
461 processDetector<o2::fdd::CTF>(DetID::FDD, ctfHeader, pc);
462 processDetector<o2::tof::CTF>(DetID::TOF, ctfHeader, pc);
463 processDetector<o2::mid::CTF>(DetID::MID, ctfHeader, pc);
464 processDetector<o2::mch::CTF>(DetID::MCH, ctfHeader, pc);
465 processDetector<o2::cpv::CTF>(DetID::CPV, ctfHeader, pc);
466 processDetector<o2::zdc::CTF>(DetID::ZDC, ctfHeader, pc);
467 processDetector<o2::ctp::CTF>(DetID::CTP, ctfHeader, pc);
468 mCTFCounterAcc++;
469
470 // send sTF acknowledge message
471 if (!mInput.sup0xccdb) {
472 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(OutputRef{"TFDist", 0xccdb});
473 stfDist.id = uint64_t(mCurrTreeEntry);
474 stfDist.firstOrbit = ctfHeader.firstTForbit;
475 stfDist.runNumber = uint32_t(ctfHeader.run);
476 }
477
478 auto entryStr = fmt::format("({} of {} in {})", mCurrTreeEntry, mCTFTree->GetEntries(), mCTFFile->GetName());
479 checkTreeEntries();
480 mTimer.Stop();
481
482 // do we need to wait to respect the delay ?
483 long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
484 if (mCTFCounter) {
485 auto tDiff = tNow - mLastSendTime;
486 if (tDiff < mInput.delay_us) {
487 pc.services().get<RawDeviceService>().waitFor((mInput.delay_us - tDiff) / 1000); // respect requested delay before sending
488 }
489 }
490 if (!mInput.checkTFLimitBeforeReading) {
491 limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
492 }
493 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
494 LOGP(info, "Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
495 mLastSendTime = tNow;
496 mCTFCounter++;
497 return true;
498}
499
501void CTFReaderSpec::checkTreeEntries()
502{
503 // check if the tree has entries left, if needed, close current tree/file
504 if (++mCurrTreeEntry >= mCTFTree->GetEntries() || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files
505 mCTFTree.reset();
506 mCTFFile->Close();
507 mCTFFile.reset();
508 if (mFileFetcher) {
509 mFileFetcher->popFromQueue(mInput.maxLoops < 1);
510 }
511 }
512}
513
515void CTFReaderSpec::setMessageHeader(ProcessingContext& pc, const CTFHeader& ctfHeader, const std::string& lbl, unsigned subspec) const
516{
517 auto* stack = pc.outputs().findMessageHeaderStack(OutputRef{lbl, subspec});
518 if (!stack) {
519 throw std::runtime_error(fmt::format("failed to find output message header stack for {}", lbl));
520 }
521 auto dh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(stack));
522 dh->firstTForbit = ctfHeader.firstTForbit;
523 dh->tfCounter = ctfHeader.tfCounter;
524 dh->runNumber = uint32_t(ctfHeader.run);
525 auto dph = const_cast<o2::framework::DataProcessingHeader*>(o2::header::get<o2::framework::DataProcessingHeader*>(stack));
526 dph->creation = ctfHeader.creationTime;
527}
528
530template <typename C>
531void CTFReaderSpec::processDetector(DetID det, const CTFHeader& ctfHeader, ProcessingContext& pc) const
532{
533 if (mInput.detMask[det]) {
534 const auto lbl = det.getName();
535 auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({lbl, mInput.subspec}, ctfHeader.detectors[det] ? sizeof(C) : 0);
536 if (ctfHeader.detectors[det]) {
537 C::readFromTree(bufVec, *(mCTFTree.get()), lbl, mCurrTreeEntry);
538 } else if (!mInput.allowMissingDetectors) {
539 throw std::runtime_error(fmt::format("Requested detector {} is missing in the CTF", lbl));
540 }
541 // setMessageHeader(pc, ctfHeader, lbl);
542 }
543}
544
546void CTFReaderSpec::tryToFixCTFHeader(CTFHeader& ctfHeader) const
547{
548 // HACK: fix CTFHeader for the pilot beam runs, where the TF creation time was not recorded
549 struct RunStartData {
550 uint32_t run = 0;
551 uint32_t firstTForbit = 0;
552 uint64_t tstampMS0 = 0;
553 };
554 const std::vector<RunStartData> tf0Data{
555 {505207, 133875, 1635322620830},
556 {505217, 14225007, 1635328375618},
557 {505278, 1349340, 1635376882079},
558 {505285, 1488862, 1635378517248},
559 {505303, 2615411, 1635392586314},
560 {505397, 5093945, 1635454778123},
561 {505404, 19196217, 1635456032855},
562 {505405, 28537913, 1635456862913},
563 {505406, 41107641, 1635457980628},
564 {505413, 452530, 1635460562613},
565 {505440, 13320708, 1635472436927},
566 {505443, 26546564, 1635473613239},
567 {505446, 177711, 1635477270241},
568 {505548, 88037114, 1635544414050},
569 {505582, 295044346, 1635562822389},
570 {505600, 417241082, 1635573688564},
571 {505623, 10445984, 1635621310460},
572 {505629, 126979, 1635623289756},
573 {505637, 338969, 1635630909893},
574 {505645, 188222, 1635634560881},
575 {505658, 81044, 1635645404694},
576 {505669, 328291, 1635657807147},
577 {505673, 30988, 1635659148972},
578 {505713, 620506, 1635725054798},
579 {505720, 5359903, 1635730673978}};
580 if (ctfHeader.run >= tf0Data.front().run && ctfHeader.run <= tf0Data.back().run) {
581 for (const auto& tf0 : tf0Data) {
582 if (ctfHeader.run == tf0.run) {
583 ctfHeader.creationTime = tf0.tstampMS0;
584 int64_t offset = std::ceil((ctfHeader.firstTForbit - tf0.firstTForbit) * o2::constants::lhc::LHCOrbitMUS * 1e-3);
585 ctfHeader.creationTime += offset > 0 ? offset : 0;
586 break;
587 }
588 }
589 }
590}
591
594{
595 std::vector<InputSpec> inputs;
596 std::vector<OutputSpec> outputs;
597 std::vector<ConfigParamSpec> options;
598
599 outputs.emplace_back(OutputLabel{"header"}, "CTF", "HEADER", inp.subspec, Lifetime::Timeframe);
600 for (auto id = DetID::First; id <= DetID::Last; id++) {
601 if (inp.detMask[id]) {
602 DetID det(id);
603 outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec, Lifetime::Timeframe);
604 }
605 }
606 if (!inp.fileIRFrames.empty() || !inp.fileRunTimeSpans.empty()) {
607 outputs.emplace_back(OutputLabel{"selIRFrames"}, "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
608 }
609 if (!inp.sup0xccdb) {
610 outputs.emplace_back(OutputSpec{{"TFDist"}, o2::header::gDataOriginFLP, o2::header::gDataDescriptionDISTSTF, 0xccdb});
611 }
612
613 options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
614 options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
615 options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
616 options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
617 options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
618 options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
619 options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
620
621 if (!inp.metricChannel.empty()) {
622 options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
623 }
624
625 return DataProcessorSpec{
626 "ctf-reader",
627 inputs,
628 outputs,
629 AlgorithmSpec{adaptFromTask<CTFReaderSpec>(inp)},
630 options};
631}
632
633} // namespace ctf
634} // namespace o2
Definitions for CPV CTF data.
Header for CTF collection.
Definitions for CTP CTF data.
Definitions for EMC CTF data.
Definitions for FDD CTF data.
Definitions for FT0 CTF data.
Definitions for FV0 CTF data.
Definitions for HMPID CTF data.
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Definitions for ITS/MFT CTF data.
Header to collect LHC related constants.
Definitions for MCH CTF data.
Definitions for MID CTF data.
Definition of the Names Generator class.
Definitions for PHOS CTF data.
Helper function to tokenize sequences and ranges of integral numbers.
uint32_t stack
Definition RawData.h:1
Definitions for TOF CTF data.
Definitions for TPC CTF data.
Definitions for TRD CTF data.
TBranch * ptr
std::ostringstream debug
Definitions for ZDC CTF data.
static constexpr std::string_view CTFTREENAME
Definition NameConf.h:95
static BasicCCDBManager & instance()
void run(o2::framework::ProcessingContext &pc) final
CTFReaderSpec(const CTFReaderInp &inp)
void init(o2::framework::InitContext &ic) final
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:94
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
void snapshot(const Output &spec, T const &object)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
size_t loadIRFrames(const std::string &fname)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLintptr offset
Definition glcorearb.h:660
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
bool readFromTree(TTree &tree, const std::string brname, T &dest, int ev=0)
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ C
Definition Defs.h:36
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string asString() const
uint32_t firstTForbit
Definition CTFHeader.h:31
uint32_t tfCounter
Definition CTFHeader.h:32
uint64_t creationTime
Definition CTFHeader.h:30
std::string fileRunTimeSpans
std::vector< int > ctfIDs
std::string metricChannel
o2::detectors::DetID::mask_t detMask
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
the main header struct
Definition DataHeader.h:618
TForbitType firstTForbit
Definition DataHeader.h:674
static AggregatedRunInfo buildAggregatedRunInfo(o2::ccdb::CCDBManagerInstance &ccdb, int runnumber)
static void trim(std::string &s)
Definition StringUtils.h:71
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
o2::InteractionRecord ir0(3, 5)