Project
Loading...
Searching...
No Matches
TFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
21#include "Framework/Task.h"
22#include "Framework/Logger.h"
28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
31#include "TFReaderSpec.h"
36#include "CommonUtils/FIFO.h"
42#include <unistd.h>
43#include <algorithm>
44#include <unordered_map>
45#include <cctype>
46#include <string>
47#include <climits>
48#include <regex>
49#include <deque>
50#include <chrono>
51#include <thread>
52
53using namespace o2::rawdd;
54using namespace std::chrono_literals;
56namespace o2f = o2::framework;
57namespace o2h = o2::header;
58
59class TFReaderSpec : public o2f::Task
60{
61 public:
62 struct SubSpecCount {
63 uint32_t defSubSpec = 0xdeadbeef;
64 int count = -1;
65 };
66
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>; // map of channel / TFparts
68
69 explicit TFReaderSpec(const TFReaderInp& rinp);
70 void init(o2f::InitContext& ic) final;
71 void run(o2f::ProcessingContext& ctx) final;
73
74 private:
75 void loadRunTimeSpans(const std::string& flname);
76 void runTimeRangesToIRFrameSelector(int runNumber);
77 void stopProcessing(o2f::ProcessingContext& ctx);
78 void TFBuilder();
79
80 private:
81 fair::mq::Device* mDevice = nullptr;
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
84 o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
85 // std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
88 o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
89 int mConvRunTimeRangesToOrbits = -1; // not defined yet
90 int mSentTFCounter = 0;
91 int mAccTFCounter = 0;
92 int mTFBuilderCounter = 0;
93 int mNWaits = 0;
94 int mTFLength = 32;
95 long mTotalWaitTime = 0;
96 size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
97 bool mRunning = false;
98 bool mWaitSendingLast = false;
99 TFReaderInp mInput; // command line inputs
100 std::thread mTFBuilderThread{};
101};
102
103//___________________________________________________________
104TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
105{
106 for (const auto& hd : rinp.hdVec) {
107 mSeenOutputMap[o2h::DataIdentifier{hd.dataDescription.str, hd.dataOrigin.str}].defSubSpec = hd.subSpecification;
108 }
109}
110
111//___________________________________________________________
113{
114 mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
115 mInput.maxTFs = ic.options().get<int>("max-tf");
116 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
117 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
118 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
119 mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
120 mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
121 if (!mInput.fileRunTimeSpans.empty()) {
122 loadRunTimeSpans(mInput.fileRunTimeSpans);
123 }
124 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
125 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
126 mFileFetcher->setMaxLoops(mInput.maxLoops);
127 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
128 mFileFetcher->start();
129}
130
131//___________________________________________________________
133{
134 if (!mDevice) {
135 mDevice = ctx.services().get<o2f::RawDeviceService>().device();
136 mOutputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs; // copy!!!
137 // start TFBuilder thread
138 mRunning = true;
139 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder, this);
140 }
141 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
142 auto device = ctx.services().get<o2f::RawDeviceService>().device();
143 assert(device);
144 if (device != mDevice) {
145 throw std::runtime_error(fmt::format("FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
146 }
147 if (mInput.tfRateLimit == -999) {
148 mInput.tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
149 }
150 auto acknowledgeOutput = [this](fair::mq::Parts& parts, bool verbose = false) {
151 int np = parts.Size();
152 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
153 const o2h::DataHeader* hdPrev = nullptr;
154 for (int ip = 0; ip < np; ip += 2) {
155 const auto& msgh = parts[ip];
156 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
157 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
158 if (verbose && mInput.verbosity > 0) {
159 LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
160 }
161 if (hd->splitPayloadIndex == 0) { // check the 1st one only
162 auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
163 if (entry.count != this->mSentTFCounter) {
164 if (verbose && hdPrev) { // report previous partition size
165 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
166 }
167 dsizeTot += dsize;
168 dsize = 0;
169 entry.count = this->mSentTFCounter; // acknowledge identifier seen in the data
170 LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
171 << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mSentTFCounter;
172 nblocks++;
173 }
174 }
175 hdPrev = hd;
176 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
177 }
178 // last part
179 dsizeTot += dsize;
180 if (verbose && hdPrev) {
181 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
182 }
183 return dsizeTot;
184 };
185
186 auto findOutputChannel = [&ctx, this](o2h::DataHeader& h, size_t tslice) {
187 if (!this->mInput.rawChannelConfig.empty()) {
188 return std::string{this->mInput.rawChannelConfig};
189 } else {
190 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
191 for (auto& oroute : outputRoutes) {
192 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
193 if (o2f::DataSpecUtils::match(oroute.matcher, h.dataOrigin, h.dataDescription, h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
194 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
195 return std::string{oroute.channel};
196 }
197 }
198 }
199 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
200 LOGP(error, "Failed to find output channel for {}/{}/{} @ timeslice {}", h.dataOrigin, h.dataDescription, h.subSpecification, h.tfCounter);
201 for (auto& oroute : outputRoutes) {
202 LOGP(info, "Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
203 }
204 return std::string{};
205 };
206 auto setTimingInfo = [&ctx](TFMap& msgMap) {
207 auto& timingInfo = ctx.services().get<o2::framework::TimingInfo>();
208 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
209 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
210 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
211 timingInfo.firstTForbit = hd0->firstTForbit;
212 timingInfo.creation = dph->creation;
213 timingInfo.tfCounter = hd0->tfCounter;
214 timingInfo.runNumber = hd0->runNumber;
215 };
216
217 auto addMissingParts = [this, &findOutputChannel](TFMap& msgMap) {
218 // at least the 1st header is guaranteed to be filled by the reader, use it for extra info
219 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
220 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
221 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
222 for (auto& out : this->mSeenOutputMap) {
223 if (out.second.count == this->mSentTFCounter) { // was seen in the data
224 continue;
225 }
226 LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
227 << "/" << out.second.defSubSpec << " for TF " << this->mSentTFCounter;
228 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
230 outHeader.firstTForbit = hd0->firstTForbit;
231 outHeader.tfCounter = hd0->tfCounter;
232 outHeader.runNumber = hd0->runNumber;
233 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
234 if (fmqChannel.empty()) { // no output channel
235 continue;
236 }
237 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
238 o2h::Stack headerStack{outHeader, *dph};
239 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
240 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
241 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
242 fair::mq::Parts* parts = msgMap[fmqChannel].get();
243 if (!parts) {
244 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
245 parts = msgMap[fmqChannel].get();
246 }
247 parts->AddPart(std::move(hdMessage));
248 parts->AddPart(std::move(plMessage));
249 }
250 };
251
252 while (1) {
253 if (mTFQueue.size()) {
254 static o2f::RateLimiter limiter;
255 limiter.check(ctx, mInput.tfRateLimit, mInput.minSHM);
256
257 auto tfPtr = std::move(mTFQueue.front());
258 mTFQueue.pop();
259 if (!tfPtr) {
260 LOG(error) << "Builder provided nullptr TF pointer";
261 continue;
262 }
263 setTimingInfo(*tfPtr.get());
264 size_t nparts = 0, dataSize = 0;
265 if (mInput.sendDummyForMissing) {
266 for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data
267 acknowledgeOutput(*msgIt.second.get(), true);
268 }
269 addMissingParts(*tfPtr.get());
270 }
271
272 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
273 auto tDiff = tNow - tLastTF;
274 if (mSentTFCounter && tDiff < mInput.delay_us) {
275 std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
276 }
277 for (auto& msgIt : *tfPtr.get()) {
278 size_t szPart = acknowledgeOutput(*msgIt.second.get(), false);
279 dataSize += szPart;
280 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
281 nparts += msgIt.second->Size() / 2;
282 device->Send(*msgIt.second.get(), msgIt.first);
283 }
284 // FIXME: this is to pretend we did send some messages via DPL.
285 // we should really migrate everything to use FairMQDeviceProxy,
286 // however this is a small enough hack for now.
287 ctx.services().get<o2f::MessageContext>().fakeDispatch();
288 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
289 LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter, dataSize, nparts, mSentTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
290 tLastTF = tNow;
291 ++mSentTFCounter;
292
293 while (mTFQueue.size() == 0 && mWaitSendingLast) {
294 usleep(10000);
295 }
296 break;
297 }
298 if (!mRunning) { // no more TFs will be provided
299 stopProcessing(ctx);
300 break;
301 }
302 // usleep(5000); // wait 5ms for new TF to be built
303 }
304 if (mSentTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
305 stopProcessing(ctx);
306 }
307}
308
309//____________________________________________________________
311{
312 if (mFileFetcher) {
313 mFileFetcher->stop();
314 mFileFetcher.reset();
315 }
316 if (mTFBuilderThread.joinable()) {
317 mTFBuilderThread.join();
318 }
319}
320
321//___________________________________________________________
322void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
323{
324 static bool stopDone = false;
325 if (stopDone) {
326 return;
327 }
328 stopDone = true;
329 LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
330 mRunning = false;
331 if (mFileFetcher) {
332 mFileFetcher->stop();
333 mFileFetcher.reset();
334 }
335 if (mTFBuilderThread.joinable()) {
336 mTFBuilderThread.join();
337 }
338 if (!mInput.rawChannelConfig.empty()) {
339 auto device = ctx.services().get<o2f::RawDeviceService>().device();
340 o2f::SourceInfoHeader exitHdr;
341 exitHdr.state = o2f::InputChannelState::Completed;
343 try {
344 dh.runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
345 } catch (...) {
346 }
347 const auto exitStack = o2h::Stack(dh, o2f::DataProcessingHeader(), exitHdr);
348 auto fmqFactory = device->GetChannel(mInput.rawChannelConfig, 0).Transport();
349 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
350 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
351 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
352 fair::mq::Parts eosMsg;
353 eosMsg.AddPart(std::move(hdEOSMessage));
354 eosMsg.AddPart(std::move(plEOSMessage));
355 device->Send(eosMsg, mInput.rawChannelConfig);
356 LOG(info) << "Sent EoS message to " << mInput.rawChannelConfig;
357 } else {
359 }
360 ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
361}
362
363//____________________________________________________________
364void TFReaderSpec::TFBuilder()
365{
366 // build TFs and add to the queue
367 std::string tfFileName;
368 auto sleepTime = std::chrono::microseconds(mInput.delay_us > 10000 ? mInput.delay_us : 10000);
369 bool waitAcknowledged = false;
370 long startWait = 0;
371 while (mRunning && mDevice) {
372 LOGP(debug, "mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.size(), mWaitSendingLast);
373 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
374 mWaitSendingLast = false;
375 std::this_thread::sleep_for(sleepTime);
376 continue;
377 }
378 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() : "";
379 if (!mRunning ||
380 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
381 mTFBuilderCounter >= mInput.maxTFs ||
382 (!mInput.tfIDs.empty() && mSelIDEntry >= mInput.tfIDs.size())) {
383 // stopped or no more files in the queue is expected or needed
384 LOG(info) << "TFReader stops processing";
385 if (mFileFetcher) {
386 mFileFetcher->stop();
387 }
388 mRunning = false;
389 mWaitSendingLast = false;
390 break;
391 }
392 if (tfFileName.empty()) {
393 if (!waitAcknowledged) {
394 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
395 waitAcknowledged = true;
396 }
397 std::this_thread::sleep_for(10ms); // wait for the files cache to be filled
398 continue;
399 }
400 mWaitSendingLast = false;
401 if (waitAcknowledged) {
402 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
403 mTotalWaitTime += waitTime;
404 if (++mNWaits > 1) {
405 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
406 }
407 waitAcknowledged = false;
408 startWait = 0;
409 }
410
411 LOG(info) << "Processing file " << tfFileName;
412 SubTimeFrameFileReader reader(tfFileName, mInput.detMask);
413 size_t locID = 0;
414 // try
415 {
416 while (mRunning && mTFBuilderCounter < mInput.maxTFs) {
417 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
418 if (mTFQueue.size() > 1) {
419 mWaitSendingLast = false;
420 }
421 std::this_thread::sleep_for(sleepTime);
422 continue;
423 }
424 auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity);
425 bool acceptTF = true;
426 if (tf) {
427 if (mRunTimeRanges.size()) {
428 const auto* dataptr = (*tf->begin()->second.get())[0].GetData();
429 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
430 static int runNumberPrev = -1;
431 if (runNumberPrev != hd0->runNumber) {
432 runNumberPrev = hd0->runNumber;
433 runTimeRangesToIRFrameSelector(runNumberPrev);
434 }
435 if (mIRFrameSelector.isSet()) {
436 o2::InteractionRecord ir0(0, hd0->firstTForbit);
437 o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff);
438 auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
439 acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
440 LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
441 irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
442 }
443 }
444 locID++;
445 if (!mInput.tfIDs.empty() && acceptTF) {
446 acceptTF = false;
447 while ((mInput.tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.tfIDs.size()) {
448 mSelIDEntry++;
449 }
450 LOGP(info, "chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.tfIDs[mSelIDEntry], mTFBuilderCounter);
451 if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
452 mWaitSendingLast = false;
453 acceptTF = true;
454 LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
455 } else {
456 LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
457 }
458 }
459 mTFBuilderCounter++;
460 }
461 if (mRunning && tf) {
462 if (acceptTF) {
463 mAccTFCounter++;
464 mWaitSendingLast = true;
465 mTFQueue.push(std::move(tf));
466 }
467 } else {
468 break;
469 }
470 if (mInput.maxTFsPerFile > 0 && locID >= mInput.maxTFsPerFile) { // go to next file
471 break;
472 }
473 }
474 // remove already processed file from the queue, unless they are needed for further looping
475 if (mFileFetcher) {
476 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops);
477 }
478 }
479 }
480}
481
482//_________________________________________________________
483void TFReaderSpec::loadRunTimeSpans(const std::string& flname)
484{
485 std::ifstream inputFile(flname);
486 if (!inputFile) {
487 LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
488 }
489 std::string line;
490 size_t cntl = 0, cntr = 0;
491 while (std::getline(inputFile, line)) {
492 cntl++;
493 for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
494 if (ch == ';' || ch == '\t' || ch == ',') {
495 ch = ' ';
496 }
497 }
499 if (line.size() < 1 || line[0] == '#') {
500 continue;
501 }
502 auto tokens = o2::utils::Str::tokenize(line, ' ');
503 auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
504 if (tokens.size() >= 3) {
505 int run = 0;
506 long rmin, rmax;
507 try {
508 run = std::stoi(tokens[0]);
509 rmin = std::stol(tokens[1]);
510 rmax = std::stol(tokens[2]);
511 } catch (...) {
512 logError();
513 continue;
514 }
515
516 constexpr long ISTimeStamp = 1514761200000L;
517 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
518 if (rmin > rmax) {
519 LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
520 }
521 if (mConvRunTimeRangesToOrbits == -1) {
522 if (convmn != convmx) {
523 LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
524 }
525 mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
526 LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
527 } else {
528 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
529 LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
530 }
531 }
532
533 mRunTimeRanges[run].emplace_back(rmin, rmax);
534 cntr++;
535 } else {
536 logError();
537 }
538 }
539 LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
540 inputFile.close();
541}
542
543//_________________________________________________________
544void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber)
545{
546 // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
547 mIRFrameSelector.clear();
548 auto ent = mRunTimeRanges.find(runNumber);
549 if (ent == mRunTimeRanges.end()) {
550 LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
551 return;
552 }
556 if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) {
557 LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber);
558 }
559 mTFLength = rinfo.orbitsPerTF;
560 std::vector<o2::dataformats::IRFrame> frames;
561 for (const auto& rng : ent->second) {
562 long orbMin = 0, orbMax = 0;
563 if (mConvRunTimeRangesToOrbits > 0) {
564 orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
565 orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
566 } else {
567 orbMin = rng.first;
568 orbMax = rng.second;
569 }
570 if (orbMin < 0) {
571 orbMin = 0;
572 }
573 if (orbMax < 0) {
574 orbMax = 0;
575 }
576 if (runNumber > 523897) {
577 orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
578 orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
579 }
580 LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
581 frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
582 }
583 mIRFrameSelector.setOwnList(frames, true);
584}
585
586//_________________________________________________________
588{
589 // check which inputs are present in files to read
591 spec.name = "tf-reader";
592 const DetID::mask_t DEFMask = DetID::getMask("ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
593 rinp.detMask = DetID::getMask(rinp.detList) & DEFMask;
594 rinp.detMaskRawOnly = DetID::getMask(rinp.detListRawOnly) & DEFMask;
596 if (rinp.rawChannelConfig.empty()) {
597 // we don't know a priori what will be the content of the TF data, so we create all possible outputs
598 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
599 if (rinp.detMask[id]) {
600 if (!rinp.detMaskNonRawOnly[id]) {
602 rinp.hdVec.emplace_back(o2h::DataHeader{"RAWDATA", DetID::getDataOrigin(id), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
603 }
604 //
605 if (rinp.detMaskRawOnly[id]) { // used asked to not open non-raw channels
606 continue;
607 }
608 // in case detectors were processed on FLP
609 if (id == DetID::CTP) {
611 rinp.hdVec.emplace_back(o2h::DataHeader{"LUMI", DetID::getDataOrigin(DetID::CTP), 0, 0}); // in abcence of real data this will be sent
612 }
613 if (id == DetID::TOF) {
615 rinp.hdVec.emplace_back(o2h::DataHeader{"CRAWDATA", DetID::getDataOrigin(DetID::TOF), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
616 } else if (id == DetID::FT0 || id == DetID::FV0 || id == DetID::FDD) {
617 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSBC", 0});
618 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSCH", 0});
619 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSBC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
620 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSCH", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
621 } else if (id == DetID::PHS) {
622 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLS", 0});
623 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLTRIGREC", 0});
624 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLS", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
625 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLTRIGREC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
626 } else if (id == DetID::CPV) {
627 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITS", 0);
628 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITTRIGREC", 0);
629 spec.outputs.emplace_back(DetID::getDataOrigin(id), "RAWHWERRORS", 0);
630 rinp.hdVec.emplace_back("DIGITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
631 rinp.hdVec.emplace_back("DIGITTRIGREC", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
632 rinp.hdVec.emplace_back("RAWHWERRORS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
633 } else if (id == DetID::EMC) {
635 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "CELLSTRGR"}});
636 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "DECODERERR"}});
637 rinp.hdVec.emplace_back("CELLS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
638 rinp.hdVec.emplace_back("CELLSTRGR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
639 rinp.hdVec.emplace_back("DECODERERR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
640 } else if (id == DetID::FOC) {
641 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PADLAYERS"}});
642 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELHITS"}});
643 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELCHIPS"}});
645 rinp.hdVec.emplace_back("PADLAYERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
646 rinp.hdVec.emplace_back("PIXELHITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
647 rinp.hdVec.emplace_back("PIXELCHIPS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
648 rinp.hdVec.emplace_back("TRIGGERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
649 }
650 }
651 }
652 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDist"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0});
653 if (!rinp.sup0xccdb) {
654 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDistCCDB"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0xccdb});
655 }
656 if (!rinp.metricChannel.empty()) {
657 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.metricChannel, {"Out-of-band channel config for TF throttling"}});
658 }
659 } else {
660 auto nameStart = rinp.rawChannelConfig.find("name=");
661 if (nameStart == std::string::npos) {
662 throw std::runtime_error("raw channel name is not provided");
663 }
664 nameStart += strlen("name=");
665 auto nameEnd = rinp.rawChannelConfig.find(",", nameStart + 1);
666 if (nameEnd == std::string::npos) {
667 nameEnd = rinp.rawChannelConfig.size();
668 }
669 spec.options = {o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {"Out-of-band channel config"}}};
670 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
671 if (!rinp.metricChannel.empty()) {
672 LOGP(alarm, "Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
673 LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
674 }
675 }
676 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
677 spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
678 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
679 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
680 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
681 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
682
683 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
684
685 return spec;
686}
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID First
Definition DetID.h:94
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID FOC
Definition DetID.h:80
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
size_t size() const
Definition FIFO.h:31
void pop()
Definition FIFO.h:56
const T & front() const
Definition FIFO.h:64
void push(Args &&... args)
Definition FIFO.h:50
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLenum GLsizei dataSize
Definition glcorearb.h:3994
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:601
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
int32_t const char int32_t line
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
RunNumberType runNumber
Definition DataHeader.h:684
Helper struct to encode origin and description of data.
Definition DataHeader.h:757
DataDescription dataDescription
Definition DataHeader.h:759
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
std::vector< int > tfIDs
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
Definition StringUtils.h:70
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
const int sleepTime
Definition test_Fifo.cxx:28
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)