Project
Loading...
Searching...
No Matches
TFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
21#include "Framework/Task.h"
22#include "Framework/Logger.h"
28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
31#include "TFReaderSpec.h"
36#include "CommonUtils/FIFO.h"
42#include <unistd.h>
43#include <algorithm>
44#include <unordered_map>
45#include <cctype>
46#include <string>
47#include <climits>
48#include <regex>
49#include <deque>
50#include <chrono>
51#include <thread>
52
53using namespace o2::rawdd;
54using namespace std::chrono_literals;
56namespace o2f = o2::framework;
57namespace o2h = o2::header;
58
59class TFReaderSpec : public o2f::Task
60{
61 public:
62 struct SubSpecCount {
63 uint32_t defSubSpec = 0xdeadbeef;
64 int count = -1;
65 };
66
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>; // map of channel / TFparts
68
69 explicit TFReaderSpec(const TFReaderInp& rinp);
70 void init(o2f::InitContext& ic) final;
71 void run(o2f::ProcessingContext& ctx) final;
73
74 private:
75 void loadRunTimeSpans(const std::string& flname);
76 void runTimeRangesToIRFrameSelector(int runNumber);
77 void stopProcessing(o2f::ProcessingContext& ctx);
78 void TFBuilder();
79
80 private:
81 fair::mq::Device* mDevice = nullptr;
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
84 o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
85 // std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
88 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
89 int mConvRunTimeRangesToOrbits = -1; // not defined yet
90 int mTFCounter = 0;
91 int mTFBuilderCounter = 0;
92 int mNWaits = 0;
93 int mTFLength = 32;
94 long mTotalWaitTime = 0;
95 size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
96 bool mRunning = false;
97 bool mWaitSendingLast = false;
98 TFReaderInp mInput; // command line inputs
99 std::thread mTFBuilderThread{};
100};
101
102//___________________________________________________________
103TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
104{
105 for (const auto& hd : rinp.hdVec) {
106 mSeenOutputMap[o2h::DataIdentifier{hd.dataDescription.str, hd.dataOrigin.str}].defSubSpec = hd.subSpecification;
107 }
108}
109
110//___________________________________________________________
112{
113 mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
114 mInput.maxTFs = ic.options().get<int>("max-tf");
115 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
116 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
117 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
118 mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
119 mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
120 if (!mInput.fileRunTimeSpans.empty()) {
121 loadRunTimeSpans(mInput.fileRunTimeSpans);
122 }
123 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
124 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
125 mFileFetcher->setMaxLoops(mInput.maxLoops);
126 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
127 mFileFetcher->start();
128}
129
130//___________________________________________________________
132{
133 if (!mDevice) {
134 mDevice = ctx.services().get<o2f::RawDeviceService>().device();
135 mOutputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs; // copy!!!
136 // start TFBuilder thread
137 mRunning = true;
138 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder, this);
139 }
140 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
141 auto device = ctx.services().get<o2f::RawDeviceService>().device();
142 assert(device);
143 if (device != mDevice) {
144 throw std::runtime_error(fmt::format("FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
145 }
146 if (mInput.tfRateLimit == -999) {
147 mInput.tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
148 }
149 auto acknowledgeOutput = [this](fair::mq::Parts& parts, bool verbose = false) {
150 int np = parts.Size();
151 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
152 const o2h::DataHeader* hdPrev = nullptr;
153 for (int ip = 0; ip < np; ip += 2) {
154 const auto& msgh = parts[ip];
155 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
156 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
157 if (verbose && mInput.verbosity > 0) {
158 LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
159 }
160 if (hd->splitPayloadIndex == 0) { // check the 1st one only
161 auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
162 if (entry.count != this->mTFCounter) {
163 if (verbose && hdPrev) { // report previous partition size
164 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
165 }
166 dsizeTot += dsize;
167 dsize = 0;
168 entry.count = this->mTFCounter; // acknowledge identifier seen in the data
169 LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
170 << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mTFCounter;
171 nblocks++;
172 }
173 }
174 hdPrev = hd;
175 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
176 }
177 // last part
178 dsizeTot += dsize;
179 if (verbose && hdPrev) {
180 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
181 }
182 return dsizeTot;
183 };
184
185 auto findOutputChannel = [&ctx, this](o2h::DataHeader& h, size_t tslice) {
186 if (!this->mInput.rawChannelConfig.empty()) {
187 return std::string{this->mInput.rawChannelConfig};
188 } else {
189 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
190 for (auto& oroute : outputRoutes) {
191 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
192 if (o2f::DataSpecUtils::match(oroute.matcher, h.dataOrigin, h.dataDescription, h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
193 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
194 return std::string{oroute.channel};
195 }
196 }
197 }
198 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
199 LOGP(error, "Failed to find output channel for {}/{}/{} @ timeslice {}", h.dataOrigin, h.dataDescription, h.subSpecification, h.tfCounter);
200 for (auto& oroute : outputRoutes) {
201 LOGP(info, "Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
202 }
203 return std::string{};
204 };
205 auto setTimingInfo = [&ctx](TFMap& msgMap) {
206 auto& timingInfo = ctx.services().get<o2::framework::TimingInfo>();
207 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
208 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
209 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
210 timingInfo.firstTForbit = hd0->firstTForbit;
211 timingInfo.creation = dph->creation;
212 timingInfo.tfCounter = hd0->tfCounter;
213 timingInfo.runNumber = hd0->runNumber;
214 };
215
216 auto addMissingParts = [this, &findOutputChannel](TFMap& msgMap) {
217 // at least the 1st header is guaranteed to be filled by the reader, use it for extra info
218 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
219 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
220 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
221 for (auto& out : this->mSeenOutputMap) {
222 if (out.second.count == this->mTFCounter) { // was seen in the data
223 continue;
224 }
225 LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
226 << "/" << out.second.defSubSpec << " for TF " << this->mTFCounter;
227 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
229 outHeader.firstTForbit = hd0->firstTForbit;
230 outHeader.tfCounter = hd0->tfCounter;
231 outHeader.runNumber = hd0->runNumber;
232 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
233 if (fmqChannel.empty()) { // no output channel
234 continue;
235 }
236 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
237 o2h::Stack headerStack{outHeader, *dph};
238 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
239 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
240 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
241 fair::mq::Parts* parts = msgMap[fmqChannel].get();
242 if (!parts) {
243 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
244 parts = msgMap[fmqChannel].get();
245 }
246 parts->AddPart(std::move(hdMessage));
247 parts->AddPart(std::move(plMessage));
248 }
249 };
250
251 while (1) {
252 if (mTFQueue.size()) {
253 static o2f::RateLimiter limiter;
254 limiter.check(ctx, mInput.tfRateLimit, mInput.minSHM);
255
256 auto tfPtr = std::move(mTFQueue.front());
257 mTFQueue.pop();
258 if (!tfPtr) {
259 LOG(error) << "Builder provided nullptr TF pointer";
260 continue;
261 }
262 setTimingInfo(*tfPtr.get());
263 size_t nparts = 0, dataSize = 0;
264 if (mInput.sendDummyForMissing) {
265 for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data
266 acknowledgeOutput(*msgIt.second.get(), true);
267 }
268 addMissingParts(*tfPtr.get());
269 }
270
271 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
272 auto tDiff = tNow - tLastTF;
273 if (mTFCounter && tDiff < mInput.delay_us) {
274 std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
275 }
276 for (auto& msgIt : *tfPtr.get()) {
277 size_t szPart = acknowledgeOutput(*msgIt.second.get(), false);
278 dataSize += szPart;
279 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
280 nparts += msgIt.second->Size() / 2;
281 device->Send(*msgIt.second.get(), msgIt.first);
282 }
283 // FIXME: this is to pretend we did send some messages via DPL.
284 // we should really migrate everything to use FairMQDeviceProxy,
285 // however this is a small enough hack for now.
286 ctx.services().get<o2f::MessageContext>().fakeDispatch();
287 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
288 LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
289 tLastTF = tNow;
290 ++mTFCounter;
291
292 while (mTFQueue.size() == 0 && mWaitSendingLast) {
293 usleep(10000);
294 }
295 break;
296 }
297 if (!mRunning) { // no more TFs will be provided
298 stopProcessing(ctx);
299 break;
300 }
301 // usleep(5000); // wait 5ms for new TF to be built
302 }
303 if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
304 stopProcessing(ctx);
305 }
306}
307
308//____________________________________________________________
310{
311 if (mFileFetcher) {
312 mFileFetcher->stop();
313 mFileFetcher.reset();
314 }
315 if (mTFBuilderThread.joinable()) {
316 mTFBuilderThread.join();
317 }
318}
319
320//___________________________________________________________
321void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
322{
323 static bool stopDone = false;
324 if (stopDone) {
325 return;
326 }
327 stopDone = true;
328 LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
329 mRunning = false;
330 if (mFileFetcher) {
331 mFileFetcher->stop();
332 mFileFetcher.reset();
333 }
334 if (mTFBuilderThread.joinable()) {
335 mTFBuilderThread.join();
336 }
337 if (!mInput.rawChannelConfig.empty()) {
338 auto device = ctx.services().get<o2f::RawDeviceService>().device();
339 o2f::SourceInfoHeader exitHdr;
340 exitHdr.state = o2f::InputChannelState::Completed;
342 try {
343 dh.runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
344 } catch (...) {
345 }
346 const auto exitStack = o2h::Stack(dh, o2f::DataProcessingHeader(), exitHdr);
347 auto fmqFactory = device->GetChannel(mInput.rawChannelConfig, 0).Transport();
348 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
349 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
350 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
351 fair::mq::Parts eosMsg;
352 eosMsg.AddPart(std::move(hdEOSMessage));
353 eosMsg.AddPart(std::move(plEOSMessage));
354 device->Send(eosMsg, mInput.rawChannelConfig);
355 LOG(info) << "Sent EoS message to " << mInput.rawChannelConfig;
356 } else {
358 }
359 ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
360}
361
362//____________________________________________________________
363void TFReaderSpec::TFBuilder()
364{
365 // build TFs and add to the queue
366 std::string tfFileName;
367 auto sleepTime = std::chrono::microseconds(mInput.delay_us > 10000 ? mInput.delay_us : 10000);
368 bool waitAcknowledged = false;
369 long startWait = 0;
370 while (mRunning && mDevice) {
371 LOGP(debug, "mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.size(), mWaitSendingLast);
372 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
373 mWaitSendingLast = false;
374 std::this_thread::sleep_for(sleepTime);
375 continue;
376 }
377 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() : "";
378 if (!mRunning ||
379 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
380 mTFBuilderCounter >= mInput.maxTFs ||
381 (!mInput.tfIDs.empty() && mSelIDEntry >= mInput.tfIDs.size())) {
382 // stopped or no more files in the queue is expected or needed
383 LOG(info) << "TFReader stops processing";
384 if (mFileFetcher) {
385 mFileFetcher->stop();
386 }
387 mRunning = false;
388 mWaitSendingLast = false;
389 break;
390 }
391 if (tfFileName.empty()) {
392 if (!waitAcknowledged) {
393 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
394 waitAcknowledged = true;
395 }
396 std::this_thread::sleep_for(10ms); // wait for the files cache to be filled
397 continue;
398 }
399 mWaitSendingLast = false;
400 if (waitAcknowledged) {
401 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
402 mTotalWaitTime += waitTime;
403 if (++mNWaits > 1) {
404 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
405 }
406 waitAcknowledged = false;
407 startWait = 0;
408 }
409
410 LOG(info) << "Processing file " << tfFileName;
411 SubTimeFrameFileReader reader(tfFileName, mInput.detMask);
412 size_t locID = 0;
413 // try
414 {
415 while (mRunning && mTFBuilderCounter < mInput.maxTFs) {
416 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
417 if (mTFQueue.size() > 1) {
418 mWaitSendingLast = false;
419 }
420 std::this_thread::sleep_for(sleepTime);
421 continue;
422 }
423 auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
424 bool acceptTF = true;
425 if (tf) {
426 if (mRunTimeRanges.size()) {
427 const auto* dataptr = (*tf->begin()->second.get())[0].GetData();
428 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
429 static int runNumberPrev = -1;
430 if (runNumberPrev != hd0->runNumber) {
431 runNumberPrev = hd0->runNumber;
432 runTimeRangesToIRFrameSelector(runNumberPrev);
433 }
434 if (mIRFrameSelector.isSet()) {
435 o2::InteractionRecord ir0(0, hd0->firstTForbit);
436 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff);
437 auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
438 acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
439 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
440 irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
441 }
442 }
443 locID++;
444 if (!mInput.tfIDs.empty() && acceptTF) {
445 acceptTF = false;
446 if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
447 mWaitSendingLast = false;
448 acceptTF = true;
449 LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
450 mSelIDEntry++;
451 } else {
452 LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
453 }
454 } else {
455 mSelIDEntry++;
456 }
457 mTFBuilderCounter++;
458 }
459 if (mRunning && tf) {
460 if (acceptTF) {
461 mWaitSendingLast = true;
462 mTFQueue.push(std::move(tf));
463 }
464 } else {
465 break;
466 }
467 if (mInput.maxTFsPerFile > 0 && locID >= mInput.maxTFsPerFile) { // go to next file
468 break;
469 }
470 }
471 // remove already processed file from the queue, unless they are needed for further looping
472 if (mFileFetcher) {
473 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops);
474 }
475 }
476 }
477}
478
479//_________________________________________________________
480void TFReaderSpec::loadRunTimeSpans(const std::string& flname)
481{
482 std::ifstream inputFile(flname);
483 if (!inputFile) {
484 LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
485 }
486 std::string line;
487 size_t cntl = 0, cntr = 0;
488 while (std::getline(inputFile, line)) {
489 cntl++;
490 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
491 if (ch == ';' || ch == '\t' || ch == ',') {
492 ch = ' ';
493 }
494 }
496 if (line.size() < 1 || line[0] == '#') {
497 continue;
498 }
499 auto tokens = o2::utils::Str::tokenize(line, ' ');
500 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
501 if (tokens.size() >= 3) {
502 int run = 0;
503 long rmin, rmax;
504 try {
505 run = std::stoi(tokens[0]);
506 rmin = std::stol(tokens[1]);
507 rmax = std::stol(tokens[2]);
508 } catch (...) {
509 logError();
510 continue;
511 }
512
513 constexpr long ISTimeStamp = 1514761200000L;
514 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
515 if (rmin > rmax) {
516 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
517 }
518 if (mConvRunTimeRangesToOrbits == -1) {
519 if (convmn != convmx) {
520 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
521 }
522 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
523 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
524 } else {
525 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
526 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
527 }
528 }
529
530 mRunTimeRanges[run].emplace_back(rmin, rmax);
531 cntr++;
532 } else {
533 logError();
534 }
535 }
536 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
537 inputFile.close();
538}
539
540//_________________________________________________________
541void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber)
542{
543 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
544 mIRFrameSelector.clear();
545 auto ent = mRunTimeRanges.find(runNumber);
546 if (ent == mRunTimeRanges.end()) {
547 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
548 return;
549 }
553 if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) {
554 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber);
555 }
556 mTFLength = rinfo.orbitsPerTF;
557 std::vector<o2::dataformats::IRFrame> frames;
558 for (const auto& rng : ent->second) {
559 long orbMin = 0, orbMax = 0;
560 if (mConvRunTimeRangesToOrbits > 0) {
561 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
562 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
563 } else {
564 orbMin = rng.first;
565 orbMax = rng.second;
566 }
567 if (orbMin < 0) {
568 orbMin = 0;
569 }
570 if (orbMax < 0) {
571 orbMax = 0;
572 }
573 if (runNumber > 523897) {
574 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
575 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
576 }
577 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
578 frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
579 }
580 mIRFrameSelector.setOwnList(frames, true);
581}
582
583//_________________________________________________________
585{
586 // check which inputs are present in files to read
588 spec.name = "tf-reader";
589 const DetID::mask_t DEFMask = DetID::getMask("ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
590 rinp.detMask = DetID::getMask(rinp.detList) & DEFMask;
591 rinp.detMaskRawOnly = DetID::getMask(rinp.detListRawOnly) & DEFMask;
593 if (rinp.rawChannelConfig.empty()) {
594 // we don't know a priori what will be the content of the TF data, so we create all possible outputs
595 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
596 if (rinp.detMask[id]) {
597 if (!rinp.detMaskNonRawOnly[id]) {
599 rinp.hdVec.emplace_back(o2h::DataHeader{"RAWDATA", DetID::getDataOrigin(id), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
600 }
601 //
602 if (rinp.detMaskRawOnly[id]) { // used asked to not open non-raw channels
603 continue;
604 }
605 // in case detectors were processed on FLP
606 if (id == DetID::CTP) {
608 rinp.hdVec.emplace_back(o2h::DataHeader{"LUMI", DetID::getDataOrigin(DetID::CTP), 0, 0}); // in abcence of real data this will be sent
609 }
610 if (id == DetID::TOF) {
612 rinp.hdVec.emplace_back(o2h::DataHeader{"CRAWDATA", DetID::getDataOrigin(DetID::TOF), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
613 } else if (id == DetID::FT0 || id == DetID::FV0 || id == DetID::FDD) {
614 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSBC", 0});
615 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSCH", 0});
616 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSBC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
617 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSCH", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
618 } else if (id == DetID::PHS) {
619 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLS", 0});
620 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLTRIGREC", 0});
621 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLS", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
622 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLTRIGREC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
623 } else if (id == DetID::CPV) {
624 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITS", 0);
625 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITTRIGREC", 0);
626 spec.outputs.emplace_back(DetID::getDataOrigin(id), "RAWHWERRORS", 0);
627 rinp.hdVec.emplace_back("DIGITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
628 rinp.hdVec.emplace_back("DIGITTRIGREC", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
629 rinp.hdVec.emplace_back("RAWHWERRORS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
630 } else if (id == DetID::EMC) {
632 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "CELLSTRGR"}});
633 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "DECODERERR"}});
634 rinp.hdVec.emplace_back("CELLS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
635 rinp.hdVec.emplace_back("CELLSTRGR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
636 rinp.hdVec.emplace_back("DECODERERR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
637 } else if (id == DetID::FOC) {
638 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PADLAYERS"}});
639 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELHITS"}});
640 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELCHIPS"}});
642 rinp.hdVec.emplace_back("PADLAYERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
643 rinp.hdVec.emplace_back("PIXELHITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
644 rinp.hdVec.emplace_back("PIXELCHIPS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
645 rinp.hdVec.emplace_back("TRIGGERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
646 }
647 }
648 }
649 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDist"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0});
650 if (!rinp.sup0xccdb) {
651 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDistCCDB"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0xccdb});
652 }
653 if (!rinp.metricChannel.empty()) {
654 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.metricChannel, {"Out-of-band channel config for TF throttling"}});
655 }
656 } else {
657 auto nameStart = rinp.rawChannelConfig.find("name=");
658 if (nameStart == std::string::npos) {
659 throw std::runtime_error("raw channel name is not provided");
660 }
661 nameStart += strlen("name=");
662 auto nameEnd = rinp.rawChannelConfig.find(",", nameStart + 1);
663 if (nameEnd == std::string::npos) {
664 nameEnd = rinp.rawChannelConfig.size();
665 }
666 spec.options = {o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {"Out-of-band channel config"}}};
667 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
668 if (!rinp.metricChannel.empty()) {
669 LOGP(alarm, "Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
670 LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
671 }
672 }
673 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
674 spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
675 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
676 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
677 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
678 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
679
680 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
681
682 return spec;
683}
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID First
Definition DetID.h:94
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID FOC
Definition DetID.h:80
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
size_t size() const
Definition FIFO.h:31
void pop()
Definition FIFO.h:56
const T & front() const
Definition FIFO.h:64
void push(Args &&... args)
Definition FIFO.h:50
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLenum GLsizei dataSize
Definition glcorearb.h:3994
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:601
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
int32_t const char int32_t line
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
RunNumberType runNumber
Definition DataHeader.h:684
Helper struct to encode origin and description of data.
Definition DataHeader.h:757
DataDescription dataDescription
Definition DataHeader.h:759
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:34
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
std::vector< int > tfIDs
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
Definition StringUtils.h:70
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
const int sleepTime
Definition test_Fifo.cxx:28
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)