Project
Loading...
Searching...
No Matches
TFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
21#include "Framework/Task.h"
22#include "Framework/Logger.h"
28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
31#include "TFReaderSpec.h"
36#include "CommonUtils/FIFO.h"
42#include <unistd.h>
43#include <algorithm>
44#include <unordered_map>
45#include <cctype>
46#include <string>
47#include <climits>
48#include <regex>
49#include <deque>
50#include <chrono>
51#include <thread>
52
53using namespace o2::rawdd;
54using namespace std::chrono_literals;
56namespace o2f = o2::framework;
57namespace o2h = o2::header;
58
59class TFReaderSpec : public o2f::Task
60{
61 public:
62 struct SubSpecCount {
63 uint32_t defSubSpec = 0xdeadbeef;
64 int count = -1;
65 };
66
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>; // map of channel / TFparts
68
69 explicit TFReaderSpec(const TFReaderInp& rinp);
70 void init(o2f::InitContext& ic) final;
71 void run(o2f::ProcessingContext& ctx) final;
73
74 private:
75 void loadRunTimeSpans(const std::string& flname);
76 void runTimeRangesToIRFrameSelector(int runNumber);
77 void stopProcessing(o2f::ProcessingContext& ctx);
78 void TFBuilder();
79
80 private:
81 fair::mq::Device* mDevice = nullptr;
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
84 o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
85 // std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
88 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
89 int mConvRunTimeRangesToOrbits = -1; // not defined yet
90 int mSentTFCounter = 0;
91 int mAccTFCounter = 0;
92 int mTFBuilderCounter = 0;
93 int mNWaits = 0;
94 int mTFLength = 32;
95 long mTotalWaitTime = 0;
96 size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
97 bool mRunning = false;
98 bool mWaitSendingLast = false;
99 TFReaderInp mInput; // command line inputs
100 std::thread mTFBuilderThread{};
101};
102
103//___________________________________________________________
104TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
105{
106 for (const auto& hd : rinp.hdVec) {
107 mSeenOutputMap[o2h::DataIdentifier{hd.dataDescription.str, hd.dataOrigin.str}].defSubSpec = hd.subSpecification;
108 }
109}
110
111//___________________________________________________________
113{
114 mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
115 mInput.maxTFs = ic.options().get<int>("max-tf");
116 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
117 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
118 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
119 mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
120 mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
121 mInput.repairHeaders = !ic.options().get<bool>("ignore-repair-headers");
122 mInput.rejectDistSTF = !ic.options().get<bool>("read-dist-stf");
123
124 if (!mInput.fileRunTimeSpans.empty()) {
125 loadRunTimeSpans(mInput.fileRunTimeSpans);
126 }
127 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd, mInput.copyDir);
128 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
129 mFileFetcher->setMaxLoops(mInput.maxLoops);
130 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
131 mFileFetcher->start();
132}
133
134//___________________________________________________________
136{
137 if (!mDevice) {
138 mDevice = ctx.services().get<o2f::RawDeviceService>().device();
139 mOutputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs; // copy!!!
140 // start TFBuilder thread
141 mRunning = true;
142 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder, this);
143 }
144 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
145 auto device = ctx.services().get<o2f::RawDeviceService>().device();
146 assert(device);
147 if (device != mDevice) {
148 throw std::runtime_error(fmt::format("FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
149 }
150 if (mInput.tfRateLimit == -999) {
151 mInput.tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
152 }
153 auto acknowledgeOutput = [this](fair::mq::Parts& parts, bool verbose = false) {
154 int np = parts.Size();
155 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
156 const o2h::DataHeader* hdPrev = nullptr;
157 for (int ip = 0; ip < np; ip += 2) {
158 const auto& msgh = parts[ip];
159 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
160 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
161 if (verbose && mInput.verbosity > 0) {
162 LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
163 }
164 if (hd->splitPayloadIndex == 0) { // check the 1st one only
165 auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
166 if (entry.count != this->mSentTFCounter) {
167 if (verbose && hdPrev) { // report previous partition size
168 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
169 }
170 dsizeTot += dsize;
171 dsize = 0;
172 entry.count = this->mSentTFCounter; // acknowledge identifier seen in the data
173 LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
174 << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mSentTFCounter;
175 nblocks++;
176 }
177 }
178 hdPrev = hd;
179 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
180 }
181 // last part
182 dsizeTot += dsize;
183 if (verbose && hdPrev) {
184 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
185 }
186 return dsizeTot;
187 };
188
189 auto findOutputChannel = [&ctx, this](o2h::DataHeader& h, size_t tslice) {
190 if (!this->mInput.rawChannelConfig.empty()) {
191 return std::string{this->mInput.rawChannelConfig};
192 } else {
193 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
194 for (auto& oroute : outputRoutes) {
195 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
196 if (o2f::DataSpecUtils::match(oroute.matcher, h.dataOrigin, h.dataDescription, h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
197 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
198 return std::string{oroute.channel};
199 }
200 }
201 }
202 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
203 LOGP(error, "Failed to find output channel for {}/{}/{} @ timeslice {}", h.dataOrigin, h.dataDescription, h.subSpecification, h.tfCounter);
204 for (auto& oroute : outputRoutes) {
205 LOGP(info, "Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
206 }
207 return std::string{};
208 };
209 auto setTimingInfo = [&ctx](TFMap& msgMap) {
210 auto& timingInfo = ctx.services().get<o2::framework::TimingInfo>();
211 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
212 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
213 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
214 timingInfo.firstTForbit = hd0->firstTForbit;
215 timingInfo.creation = dph->creation;
216 timingInfo.tfCounter = hd0->tfCounter;
217 timingInfo.runNumber = hd0->runNumber;
218 };
219
220 auto addMissingParts = [this, &findOutputChannel](TFMap& msgMap) {
221 // at least the 1st header is guaranteed to be filled by the reader, use it for extra info
222 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
223 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
224 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
225 for (auto& out : this->mSeenOutputMap) {
226 if (out.second.count == this->mSentTFCounter) { // was seen in the data
227 continue;
228 }
229 LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
230 << "/" << out.second.defSubSpec << " for TF " << this->mSentTFCounter;
231 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
233 outHeader.firstTForbit = hd0->firstTForbit;
234 outHeader.tfCounter = hd0->tfCounter;
235 outHeader.runNumber = hd0->runNumber;
236 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
237 if (fmqChannel.empty()) { // no output channel
238 continue;
239 }
240 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
241 o2h::Stack headerStack{outHeader, *dph};
242 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
243 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
244 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
245 fair::mq::Parts* parts = msgMap[fmqChannel].get();
246 if (!parts) {
247 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
248 parts = msgMap[fmqChannel].get();
249 }
250 parts->AddPart(std::move(hdMessage));
251 parts->AddPart(std::move(plMessage));
252 }
253 };
254
255 while (1) {
256 if (mTFQueue.size()) {
257 static o2f::RateLimiter limiter;
258 limiter.check(ctx, mInput.tfRateLimit, mInput.minSHM);
259
260 auto tfPtr = std::move(mTFQueue.front());
261 mTFQueue.pop();
262 if (!tfPtr) {
263 LOG(error) << "Builder provided nullptr TF pointer";
264 continue;
265 }
266 setTimingInfo(*tfPtr.get());
267 size_t nparts = 0, dataSize = 0;
268 if (mInput.sendDummyForMissing) {
269 int cntAck = 0;
270 for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data
271 if (mInput.verbosity > 0) {
272 LOGP(info, "acknowledgeOutput {}", cntAck++);
273 }
274 acknowledgeOutput(*msgIt.second.get(), true);
275 }
276 addMissingParts(*tfPtr.get());
277 }
278
279 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
280 auto tDiff = tNow - tLastTF;
281 if (mSentTFCounter && tDiff < mInput.delay_us) {
282 std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
283 }
284 for (auto& msgIt : *tfPtr.get()) {
285 size_t szPart = acknowledgeOutput(*msgIt.second.get(), false);
286 dataSize += szPart;
287 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
288 nparts += msgIt.second->Size() / 2;
289 device->Send(*msgIt.second.get(), msgIt.first);
290 }
291 // FIXME: this is to pretend we did send some messages via DPL.
292 // we should really migrate everything to use FairMQDeviceProxy,
293 // however this is a small enough hack for now.
294 ctx.services().get<o2f::MessageContext>().fakeDispatch();
295 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
296 LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter, dataSize, nparts, mSentTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
297 tLastTF = tNow;
298 ++mSentTFCounter;
299
300 while (mTFQueue.size() == 0 && mWaitSendingLast) {
301 usleep(10000);
302 }
303 break;
304 }
305 if (!mRunning) { // no more TFs will be provided
306 stopProcessing(ctx);
307 break;
308 }
309 // usleep(5000); // wait 5ms for new TF to be built
310 }
311 if (mSentTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
312 stopProcessing(ctx);
313 }
314}
315
316//____________________________________________________________
318{
319 if (mFileFetcher) {
320 mFileFetcher->stop();
321 mFileFetcher.reset();
322 }
323 if (mTFBuilderThread.joinable()) {
324 mTFBuilderThread.join();
325 }
326}
327
328//___________________________________________________________
329void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
330{
331 static bool stopDone = false;
332 if (stopDone) {
333 return;
334 }
335 stopDone = true;
336 LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
337 mRunning = false;
338 if (mFileFetcher) {
339 mFileFetcher->stop();
340 mFileFetcher.reset();
341 }
342 if (mTFBuilderThread.joinable()) {
343 mTFBuilderThread.join();
344 }
345 if (!mInput.rawChannelConfig.empty()) {
346 auto device = ctx.services().get<o2f::RawDeviceService>().device();
347 o2f::SourceInfoHeader exitHdr;
348 exitHdr.state = o2f::InputChannelState::Completed;
350 try {
351 dh.runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
352 } catch (...) {
353 }
354 const auto exitStack = o2h::Stack(dh, o2f::DataProcessingHeader(), exitHdr);
355 auto fmqFactory = device->GetChannel(mInput.rawChannelConfig, 0).Transport();
356 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
357 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
358 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
359 fair::mq::Parts eosMsg;
360 eosMsg.AddPart(std::move(hdEOSMessage));
361 eosMsg.AddPart(std::move(plEOSMessage));
362 device->Send(eosMsg, mInput.rawChannelConfig);
363 LOG(info) << "Sent EoS message to " << mInput.rawChannelConfig;
364 } else {
366 }
367 ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
368}
369
370//____________________________________________________________
371void TFReaderSpec::TFBuilder()
372{
373 // build TFs and add to the queue
374 std::string tfFileName;
375 auto sleepTime = std::chrono::microseconds(mInput.delay_us > 10000 ? mInput.delay_us : 10000);
376 bool waitAcknowledged = false;
377 long startWait = 0;
378 while (mRunning && mDevice) {
379 LOGP(debug, "mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.size(), mWaitSendingLast);
380 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
381 mWaitSendingLast = false;
382 std::this_thread::sleep_for(sleepTime);
383 continue;
384 }
385 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() : "";
386 if (!mRunning ||
387 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
388 mTFBuilderCounter >= mInput.maxTFs ||
389 (!mInput.tfIDs.empty() && mSelIDEntry >= mInput.tfIDs.size())) {
390 // stopped or no more files in the queue is expected or needed
391 LOG(info) << "TFReader stops processing";
392 if (mFileFetcher) {
393 mFileFetcher->stop();
394 }
395 mRunning = false;
396 mWaitSendingLast = false;
397 break;
398 }
399 if (tfFileName.empty()) {
400 if (!waitAcknowledged) {
401 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
402 waitAcknowledged = true;
403 }
404 std::this_thread::sleep_for(10ms); // wait for the files cache to be filled
405 continue;
406 }
407 mWaitSendingLast = false;
408 if (waitAcknowledged) {
409 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
410 mTotalWaitTime += waitTime;
411 if (++mNWaits > 1) {
412 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
413 }
414 waitAcknowledged = false;
415 startWait = 0;
416 }
417
418 LOG(info) << "Processing file " << tfFileName;
419 SubTimeFrameFileReader reader(tfFileName, mInput.detMask, mInput.verbosity, mInput.sup0xccdb, mInput.repairHeaders, mInput.rejectDistSTF);
420 size_t locID = 0;
421 // try
422 {
423 while (mRunning && mTFBuilderCounter < mInput.maxTFs) {
424 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
425 if (mTFQueue.size() > 1) {
426 mWaitSendingLast = false;
427 }
428 std::this_thread::sleep_for(sleepTime);
429 continue;
430 }
431 auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter);
432 bool acceptTF = true;
433 if (tf) {
434 if (mRunTimeRanges.size()) {
435 const auto* dataptr = (*tf->begin()->second.get())[0].GetData();
436 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
437 static int runNumberPrev = -1;
438 if (runNumberPrev != hd0->runNumber) {
439 runNumberPrev = hd0->runNumber;
440 runTimeRangesToIRFrameSelector(runNumberPrev);
441 }
442 if (mIRFrameSelector.isSet()) {
443 o2::InteractionRecord ir0(0, hd0->firstTForbit);
444 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff);
445 auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
446 acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
447 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
448 irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
449 }
450 }
451 locID++;
452 if (!mInput.tfIDs.empty() && acceptTF) {
453 acceptTF = false;
454 while ((mInput.tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.tfIDs.size()) {
455 mSelIDEntry++;
456 }
457 LOGP(info, "chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.tfIDs[mSelIDEntry], mTFBuilderCounter);
458 if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
459 mWaitSendingLast = false;
460 acceptTF = true;
461 LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
462 } else {
463 LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
464 }
465 }
466 mTFBuilderCounter++;
467 }
468 if (mRunning && tf) {
469 if (acceptTF) {
470 mAccTFCounter++;
471 mWaitSendingLast = true;
472 mTFQueue.push(std::move(tf));
473 }
474 } else {
475 break;
476 }
477 if (mInput.maxTFsPerFile > 0 && locID >= mInput.maxTFsPerFile) { // go to next file
478 break;
479 }
480 }
481 // remove already processed file from the queue, unless they are needed for further looping
482 if (mFileFetcher) {
483 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops);
484 }
485 }
486 }
487}
488
489//_________________________________________________________
490void TFReaderSpec::loadRunTimeSpans(const std::string& flname)
491{
492 std::ifstream inputFile(flname);
493 if (!inputFile) {
494 LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
495 }
496 std::string line;
497 size_t cntl = 0, cntr = 0;
498 while (std::getline(inputFile, line)) {
499 cntl++;
500 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
501 if (ch == ';' || ch == '\t' || ch == ',') {
502 ch = ' ';
503 }
504 }
506 if (line.size() < 1 || line[0] == '#') {
507 continue;
508 }
509 auto tokens = o2::utils::Str::tokenize(line, ' ');
510 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
511 if (tokens.size() >= 3) {
512 int run = 0;
513 long rmin, rmax;
514 try {
515 run = std::stoi(tokens[0]);
516 rmin = std::stol(tokens[1]);
517 rmax = std::stol(tokens[2]);
518 } catch (...) {
519 logError();
520 continue;
521 }
522
523 constexpr long ISTimeStamp = 1514761200000L;
524 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
525 if (rmin > rmax) {
526 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
527 }
528 if (mConvRunTimeRangesToOrbits == -1) {
529 if (convmn != convmx) {
530 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
531 }
532 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
533 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
534 } else {
535 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
536 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
537 }
538 }
539
540 mRunTimeRanges[run].emplace_back(rmin, rmax);
541 cntr++;
542 } else {
543 logError();
544 }
545 }
546 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
547 inputFile.close();
548}
549
550//_________________________________________________________
551void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber)
552{
553 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
554 mIRFrameSelector.clear();
555 auto ent = mRunTimeRanges.find(runNumber);
556 if (ent == mRunTimeRanges.end()) {
557 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
558 return;
559 }
563 if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) {
564 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber);
565 }
566 mTFLength = rinfo.orbitsPerTF;
567 std::vector<o2::dataformats::IRFrame> frames;
568 for (const auto& rng : ent->second) {
569 long orbMin = 0, orbMax = 0;
570 if (mConvRunTimeRangesToOrbits > 0) {
571 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
572 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
573 } else {
574 orbMin = rng.first;
575 orbMax = rng.second;
576 }
577 if (orbMin < 0) {
578 orbMin = 0;
579 }
580 if (orbMax < 0) {
581 orbMax = 0;
582 }
583 if (runNumber > 523897) {
584 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
585 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
586 }
587 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
588 frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
589 }
590 mIRFrameSelector.setOwnList(frames, true);
591}
592
593//_________________________________________________________
595{
596 // check which inputs are present in files to read
598 spec.name = "tf-reader";
599 const DetID::mask_t DEFMask = DetID::getMask("ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
600 rinp.detMask = DetID::getMask(rinp.detList) & DEFMask;
601 rinp.detMaskRawOnly = DetID::getMask(rinp.detListRawOnly) & DEFMask;
603 if (rinp.rawChannelConfig.empty()) {
604 // we don't know a priori what will be the content of the TF data, so we create all possible outputs
605 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
606 if (rinp.detMask[id]) {
607 if (!rinp.detMaskNonRawOnly[id]) {
609 rinp.hdVec.emplace_back(o2h::DataHeader{"RAWDATA", DetID::getDataOrigin(id), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
610 }
611 //
612 if (rinp.detMaskRawOnly[id]) { // used asked to not open non-raw channels
613 continue;
614 }
615 // in case detectors were processed on FLP
616 if (id == DetID::CTP) {
618 rinp.hdVec.emplace_back(o2h::DataHeader{"LUMI", DetID::getDataOrigin(DetID::CTP), 0, 0}); // in abcence of real data this will be sent
619 }
620 if (id == DetID::TOF) {
622 rinp.hdVec.emplace_back(o2h::DataHeader{"CRAWDATA", DetID::getDataOrigin(DetID::TOF), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
623 } else if (id == DetID::FT0 || id == DetID::FV0 || id == DetID::FDD) {
624 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSBC", 0});
625 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSCH", 0});
626 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSBC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
627 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSCH", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
628 } else if (id == DetID::PHS) {
629 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLS", 0});
630 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLTRIGREC", 0});
631 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLS", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
632 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLTRIGREC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
633 } else if (id == DetID::CPV) {
634 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITS", 0);
635 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITTRIGREC", 0);
636 spec.outputs.emplace_back(DetID::getDataOrigin(id), "RAWHWERRORS", 0);
637 rinp.hdVec.emplace_back("DIGITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
638 rinp.hdVec.emplace_back("DIGITTRIGREC", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
639 rinp.hdVec.emplace_back("RAWHWERRORS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
640 } else if (id == DetID::EMC) {
642 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "CELLSTRGR"}});
643 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "DECODERERR"}});
644 rinp.hdVec.emplace_back("CELLS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
645 rinp.hdVec.emplace_back("CELLSTRGR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
646 rinp.hdVec.emplace_back("DECODERERR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
647 } else if (id == DetID::FOC) {
648 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PADLAYERS"}});
649 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELHITS"}});
650 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELCHIPS"}});
652 rinp.hdVec.emplace_back("PADLAYERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
653 rinp.hdVec.emplace_back("PIXELHITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
654 rinp.hdVec.emplace_back("PIXELCHIPS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
655 rinp.hdVec.emplace_back("TRIGGERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
656 }
657 }
658 }
659 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDist"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0});
660 if (!rinp.sup0xccdb) {
661 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDistCCDB"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0xccdb});
662 }
663 if (!rinp.metricChannel.empty()) {
664 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.metricChannel, {"Out-of-band channel config for TF throttling"}});
665 }
666 } else {
667 auto nameStart = rinp.rawChannelConfig.find("name=");
668 if (nameStart == std::string::npos) {
669 throw std::runtime_error("raw channel name is not provided");
670 }
671 nameStart += strlen("name=");
672 auto nameEnd = rinp.rawChannelConfig.find(",", nameStart + 1);
673 if (nameEnd == std::string::npos) {
674 nameEnd = rinp.rawChannelConfig.size();
675 }
676 spec.options = {o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {"Out-of-band channel config"}}};
677 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
678 if (!rinp.metricChannel.empty()) {
679 LOGP(alarm, "Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
680 LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
681 }
682 }
683 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
684 spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
685 spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}});
686 spec.options.emplace_back(o2f::ConfigParamSpec{"read-dist-stf", o2f::VariantType::Bool, false, {"do not ignore stored FLP/DISTSUBTIMEFRAME (will clash with injected one)"}});
687 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
688 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
689 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
690 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
691
692 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
693
694 return spec;
695}
std::ostringstream debug
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID First
Definition DetID.h:95
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID FOC
Definition DetID.h:80
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
size_t size() const
Definition FIFO.h:31
void pop()
Definition FIFO.h:56
const T & front() const
Definition FIFO.h:64
void push(Args &&... args)
Definition FIFO.h:50
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLenum GLsizei dataSize
Definition glcorearb.h:3994
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:602
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
int32_t const char int32_t line
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:620
TFCounterType tfCounter
Definition DataHeader.h:681
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:653
TForbitType firstTForbit
Definition DataHeader.h:676
DataDescription dataDescription
Definition DataHeader.h:638
RunNumberType runNumber
Definition DataHeader.h:686
Helper struct to encode origin and description of data.
Definition DataHeader.h:759
DataDescription dataDescription
Definition DataHeader.h:761
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec, const o2::parameters::GRPLHCIFData *grplhcif=nullptr)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
std::vector< int > tfIDs
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
Definition StringUtils.h:70
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
const int sleepTime
Definition test_Fifo.cxx:28
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)