Project
Loading...
Searching...
No Matches
TFReaderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
21#include "Framework/Task.h"
22#include "Framework/Logger.h"
28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
31#include "TFReaderSpec.h"
35#include "CommonUtils/FIFO.h"
36#include <unistd.h>
37#include <algorithm>
38#include <unordered_map>
39#include <cctype>
40#include <string>
41#include <climits>
42#include <regex>
43#include <deque>
44#include <chrono>
45#include <thread>
46
47using namespace o2::rawdd;
48using namespace std::chrono_literals;
50namespace o2f = o2::framework;
51namespace o2h = o2::header;
52
53class TFReaderSpec : public o2f::Task
54{
55 public:
56 struct SubSpecCount {
57 uint32_t defSubSpec = 0xdeadbeef;
58 int count = -1;
59 };
60
61 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>; // map of channel / TFparts
62
63 explicit TFReaderSpec(const TFReaderInp& rinp);
64 void init(o2f::InitContext& ic) final;
65 void run(o2f::ProcessingContext& ctx) final;
67
68 private:
69 void stopProcessing(o2f::ProcessingContext& ctx);
70 void TFBuilder();
71
72 private:
73 fair::mq::Device* mDevice = nullptr;
74 std::vector<o2f::OutputRoute> mOutputRoutes;
75 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
76 o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
77 // std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
78 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
79 int mTFCounter = 0;
80 int mTFBuilderCounter = 0;
81 int mNWaits = 0;
82 long mTotalWaitTime = 0;
83 size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
84 bool mRunning = false;
85 bool mWaitSendingLast = false;
86 TFReaderInp mInput; // command line inputs
87 std::thread mTFBuilderThread{};
88};
89
90//___________________________________________________________
91TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
92{
93 for (const auto& hd : rinp.hdVec) {
94 mSeenOutputMap[o2h::DataIdentifier{hd.dataDescription.str, hd.dataOrigin.str}].defSubSpec = hd.subSpecification;
95 }
96}
97
98//___________________________________________________________
100{
101 mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
102 mInput.maxTFs = ic.options().get<int>("max-tf");
103 mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
104 mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
105 mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
106 mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
107 mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
108 mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
109 mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
110 mFileFetcher->setMaxLoops(mInput.maxLoops);
111 mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
112 mFileFetcher->start();
113}
114
115//___________________________________________________________
117{
118 if (!mDevice) {
119 mDevice = ctx.services().get<o2f::RawDeviceService>().device();
120 mOutputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs; // copy!!!
121 // start TFBuilder thread
122 mRunning = true;
123 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder, this);
124 }
125 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
126 auto device = ctx.services().get<o2f::RawDeviceService>().device();
127 assert(device);
128 if (device != mDevice) {
129 throw std::runtime_error(fmt::format("FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
130 }
131 if (mInput.tfRateLimit == -999) {
132 mInput.tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
133 }
134 auto acknowledgeOutput = [this](fair::mq::Parts& parts, bool verbose = false) {
135 int np = parts.Size();
136 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
137 const o2h::DataHeader* hdPrev = nullptr;
138 for (int ip = 0; ip < np; ip += 2) {
139 const auto& msgh = parts[ip];
140 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
141 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
142 if (verbose && mInput.verbosity > 0) {
143 LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
144 }
145 if (dph->startTime != this->mTFCounter) {
146 LOGP(fatal, "Local tf counter {} != TF timeslice {} for {}", this->mTFCounter, dph->startTime,
147 o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{hd->dataOrigin, hd->dataDescription, hd->subSpecification}));
148 }
149 if (hd->splitPayloadIndex == 0) { // check the 1st one only
150 auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
151 if (entry.count != this->mTFCounter) {
152 if (verbose && hdPrev) { // report previous partition size
153 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
154 }
155 dsizeTot += dsize;
156 dsize = 0;
157 entry.count = this->mTFCounter; // acknowledge identifier seen in the data
158 LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
159 << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mTFCounter;
160 nblocks++;
161 }
162 }
163 hdPrev = hd;
164 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
165 }
166 // last part
167 dsizeTot += dsize;
168 if (verbose && hdPrev) {
169 LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
170 }
171 return dsizeTot;
172 };
173
174 auto findOutputChannel = [&ctx, this](o2h::DataHeader& h, size_t tslice) {
175 if (!this->mInput.rawChannelConfig.empty()) {
176 return std::string{this->mInput.rawChannelConfig};
177 } else {
178 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
179 for (auto& oroute : outputRoutes) {
180 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
181 if (o2f::DataSpecUtils::match(oroute.matcher, h.dataOrigin, h.dataDescription, h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
182 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
183 return std::string{oroute.channel};
184 }
185 }
186 }
187 auto& outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
188 LOGP(error, "Failed to find output channel for {}/{}/{} @ timeslice {}", h.dataOrigin, h.dataDescription, h.subSpecification, h.tfCounter);
189 for (auto& oroute : outputRoutes) {
190 LOGP(info, "Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
191 }
192 return std::string{};
193 };
194 auto setTimingInfo = [&ctx](TFMap& msgMap) {
195 auto& timingInfo = ctx.services().get<o2::framework::TimingInfo>();
196 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
197 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
198 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
199 timingInfo.firstTForbit = hd0->firstTForbit;
200 timingInfo.creation = dph->creation;
201 timingInfo.tfCounter = hd0->tfCounter;
202 timingInfo.runNumber = hd0->runNumber;
203 };
204
205 auto addMissingParts = [this, &findOutputChannel](TFMap& msgMap) {
206 // at least the 1st header is guaranteed to be filled by the reader, use it for extra info
207 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
208 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
209 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
210 for (auto& out : this->mSeenOutputMap) {
211 if (out.second.count == this->mTFCounter) { // was seen in the data
212 continue;
213 }
214 LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
215 << "/" << out.second.defSubSpec << " for TF " << this->mTFCounter;
216 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
218 outHeader.firstTForbit = hd0->firstTForbit;
219 outHeader.tfCounter = hd0->tfCounter;
220 outHeader.runNumber = hd0->runNumber;
221 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
222 if (fmqChannel.empty()) { // no output channel
223 continue;
224 }
225 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
226 o2h::Stack headerStack{outHeader, *dph};
227 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
228 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
229 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
230 fair::mq::Parts* parts = msgMap[fmqChannel].get();
231 if (!parts) {
232 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
233 parts = msgMap[fmqChannel].get();
234 }
235 parts->AddPart(std::move(hdMessage));
236 parts->AddPart(std::move(plMessage));
237 }
238 };
239
240 while (1) {
241 if (mTFQueue.size()) {
242 static o2f::RateLimiter limiter;
243 limiter.check(ctx, mInput.tfRateLimit, mInput.minSHM);
244
245 auto tfPtr = std::move(mTFQueue.front());
246 mTFQueue.pop();
247 if (!tfPtr) {
248 LOG(error) << "Builder provided nullptr TF pointer";
249 continue;
250 }
251 setTimingInfo(*tfPtr.get());
252 size_t nparts = 0, dataSize = 0;
253 if (mInput.sendDummyForMissing) {
254 for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data
255 acknowledgeOutput(*msgIt.second.get(), true);
256 }
257 addMissingParts(*tfPtr.get());
258 }
259
260 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
261 auto tDiff = tNow - tLastTF;
262 if (mTFCounter && tDiff < mInput.delay_us) {
263 std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
264 }
265 for (auto& msgIt : *tfPtr.get()) {
266 size_t szPart = acknowledgeOutput(*msgIt.second.get(), false);
267 dataSize += szPart;
268 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
269 nparts += msgIt.second->Size() / 2;
270 device->Send(*msgIt.second.get(), msgIt.first);
271 }
272 // FIXME: this is to pretend we did send some messages via DPL.
273 // we should really migrate everything to use FairMQDeviceProxy,
274 // however this is a small enough hack for now.
275 ctx.services().get<o2f::MessageContext>().fakeDispatch();
276 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
277 LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
278 tLastTF = tNow;
279 ++mTFCounter;
280
281 while (mTFQueue.size() == 0 && mWaitSendingLast) {
282 usleep(10000);
283 }
284 break;
285 }
286 if (!mRunning) { // no more TFs will be provided
287 stopProcessing(ctx);
288 break;
289 }
290 // usleep(5000); // wait 5ms for new TF to be built
291 }
292 if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
293 stopProcessing(ctx);
294 }
295}
296
297//____________________________________________________________
299{
300 if (mFileFetcher) {
301 mFileFetcher->stop();
302 mFileFetcher.reset();
303 }
304 if (mTFBuilderThread.joinable()) {
305 mTFBuilderThread.join();
306 }
307}
308
309//___________________________________________________________
310void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
311{
312 static bool stopDone = false;
313 if (stopDone) {
314 return;
315 }
316 stopDone = true;
317 LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
318 mRunning = false;
319 if (mFileFetcher) {
320 mFileFetcher->stop();
321 mFileFetcher.reset();
322 }
323 if (mTFBuilderThread.joinable()) {
324 mTFBuilderThread.join();
325 }
326 if (!mInput.rawChannelConfig.empty()) {
327 auto device = ctx.services().get<o2f::RawDeviceService>().device();
328 o2f::SourceInfoHeader exitHdr;
329 exitHdr.state = o2f::InputChannelState::Completed;
331 auto fmqFactory = device->GetChannel(mInput.rawChannelConfig, 0).Transport();
332 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
333 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
334 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
335 fair::mq::Parts eosMsg;
336 eosMsg.AddPart(std::move(hdEOSMessage));
337 eosMsg.AddPart(std::move(plEOSMessage));
338 device->Send(eosMsg, mInput.rawChannelConfig);
339 LOG(info) << "Sent EoS message to " << mInput.rawChannelConfig;
340 } else {
342 }
343 ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
344}
345
346//____________________________________________________________
347void TFReaderSpec::TFBuilder()
348{
349 // build TFs and add to the queue
350 std::string tfFileName;
351 auto sleepTime = std::chrono::microseconds(mInput.delay_us > 10000 ? mInput.delay_us : 10000);
352 bool waitAcknowledged = false;
353 long startWait = 0;
354 while (mRunning && mDevice) {
355 LOGP(debug, "mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.size(), mWaitSendingLast);
356 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
357 mWaitSendingLast = false;
358 std::this_thread::sleep_for(sleepTime);
359 continue;
360 }
361 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() : "";
362 if (!mRunning ||
363 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
364 mTFBuilderCounter >= mInput.maxTFs ||
365 (!mInput.tfIDs.empty() && mSelIDEntry >= mInput.tfIDs.size())) {
366 // stopped or no more files in the queue is expected or needed
367 LOG(info) << "TFReader stops processing";
368 if (mFileFetcher) {
369 mFileFetcher->stop();
370 }
371 mRunning = false;
372 mWaitSendingLast = false;
373 break;
374 }
375 if (tfFileName.empty()) {
376 if (!waitAcknowledged) {
377 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
378 waitAcknowledged = true;
379 }
380 std::this_thread::sleep_for(10ms); // wait for the files cache to be filled
381 continue;
382 }
383 mWaitSendingLast = false;
384 if (waitAcknowledged) {
385 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
386 mTotalWaitTime += waitTime;
387 if (++mNWaits > 1) {
388 LOGP(warn, "Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
389 }
390 waitAcknowledged = false;
391 startWait = 0;
392 }
393
394 LOG(info) << "Processing file " << tfFileName;
395 SubTimeFrameFileReader reader(tfFileName, mInput.detMask);
396 size_t locID = 0;
397 // try
398 {
399 while (mRunning && mTFBuilderCounter < mInput.maxTFs) {
400 if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
401 if (mTFQueue.size() > 1) {
402 mWaitSendingLast = false;
403 }
404 std::this_thread::sleep_for(sleepTime);
405 continue;
406 }
407 auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
408 bool acceptTF = true;
409 if (tf) {
410 locID++;
411 if (!mInput.tfIDs.empty()) {
412 acceptTF = false;
413 if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
414 mWaitSendingLast = false;
415 acceptTF = true;
416 LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
417 mSelIDEntry++;
418 } else {
419 LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
420 }
421 } else {
422 mSelIDEntry++;
423 }
424 mTFBuilderCounter++;
425 }
426 if (mRunning && tf) {
427 if (acceptTF) {
428 mWaitSendingLast = true;
429 mTFQueue.push(std::move(tf));
430 }
431 } else {
432 break;
433 }
434 if (mInput.maxTFsPerFile > 0 && locID >= mInput.maxTFsPerFile) { // go to next file
435 break;
436 }
437 }
438 // remove already processed file from the queue, unless they are needed for further looping
439 if (mFileFetcher) {
440 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops);
441 }
442 }
443 }
444}
445
446//_________________________________________________________
448{
449 // check which inputs are present in files to read
451 spec.name = "tf-reader";
452 const DetID::mask_t DEFMask = DetID::getMask("ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
453 rinp.detMask = DetID::getMask(rinp.detList) & DEFMask;
454 rinp.detMaskRawOnly = DetID::getMask(rinp.detListRawOnly) & DEFMask;
456 if (rinp.rawChannelConfig.empty()) {
457 // we don't know a priori what will be the content of the TF data, so we create all possible outputs
458 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
459 if (rinp.detMask[id]) {
460 if (!rinp.detMaskNonRawOnly[id]) {
462 rinp.hdVec.emplace_back(o2h::DataHeader{"RAWDATA", DetID::getDataOrigin(id), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
463 }
464 //
465 if (rinp.detMaskRawOnly[id]) { // used asked to not open non-raw channels
466 continue;
467 }
468 // in case detectors were processed on FLP
469 if (id == DetID::CTP) {
471 rinp.hdVec.emplace_back(o2h::DataHeader{"LUMI", DetID::getDataOrigin(DetID::CTP), 0, 0}); // in abcence of real data this will be sent
472 }
473 if (id == DetID::TOF) {
475 rinp.hdVec.emplace_back(o2h::DataHeader{"CRAWDATA", DetID::getDataOrigin(DetID::TOF), 0xDEADBEEF, 0}); // in abcence of real data this will be sent
476 } else if (id == DetID::FT0 || id == DetID::FV0 || id == DetID::FDD) {
477 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSBC", 0});
478 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "DIGITSCH", 0});
479 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSBC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
480 rinp.hdVec.emplace_back(o2h::DataHeader{"DIGITSCH", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
481 } else if (id == DetID::PHS) {
482 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLS", 0});
483 spec.outputs.emplace_back(o2f::OutputSpec{DetID::getDataOrigin(id), "CELLTRIGREC", 0});
484 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLS", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
485 rinp.hdVec.emplace_back(o2h::DataHeader{"CELLTRIGREC", DetID::getDataOrigin(id), 0, 0}); // in abcence of real data this will be sent
486 } else if (id == DetID::CPV) {
487 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITS", 0);
488 spec.outputs.emplace_back(DetID::getDataOrigin(id), "DIGITTRIGREC", 0);
489 spec.outputs.emplace_back(DetID::getDataOrigin(id), "RAWHWERRORS", 0);
490 rinp.hdVec.emplace_back("DIGITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
491 rinp.hdVec.emplace_back("DIGITTRIGREC", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
492 rinp.hdVec.emplace_back("RAWHWERRORS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
493 } else if (id == DetID::EMC) {
495 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "CELLSTRGR"}});
496 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "DECODERERR"}});
497 rinp.hdVec.emplace_back("CELLS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
498 rinp.hdVec.emplace_back("CELLSTRGR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
499 rinp.hdVec.emplace_back("DECODERERR", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
500 } else if (id == DetID::FOC) {
501 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PADLAYERS"}});
502 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELHITS"}});
503 spec.outputs.emplace_back(o2f::OutputSpec{o2f::ConcreteDataTypeMatcher{DetID::getDataOrigin(id), "PIXELCHIPS"}});
505 rinp.hdVec.emplace_back("PADLAYERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
506 rinp.hdVec.emplace_back("PIXELHITS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
507 rinp.hdVec.emplace_back("PIXELCHIPS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
508 rinp.hdVec.emplace_back("TRIGGERS", DetID::getDataOrigin(id), 0, 0); // in abcence of real data this will be sent
509 }
510 }
511 }
512 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDist"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0});
513 if (!rinp.sup0xccdb) {
514 o2f::DataSpecUtils::updateOutputList(spec.outputs, o2f::OutputSpec{{"stfDistCCDB"}, o2h::gDataOriginFLP, o2h::gDataDescriptionDISTSTF, 0xccdb});
515 }
516 if (!rinp.metricChannel.empty()) {
517 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.metricChannel, {"Out-of-band channel config for TF throttling"}});
518 }
519 } else {
520 auto nameStart = rinp.rawChannelConfig.find("name=");
521 if (nameStart == std::string::npos) {
522 throw std::runtime_error("raw channel name is not provided");
523 }
524 nameStart += strlen("name=");
525 auto nameEnd = rinp.rawChannelConfig.find(",", nameStart + 1);
526 if (nameEnd == std::string::npos) {
527 nameEnd = rinp.rawChannelConfig.size();
528 }
529 spec.options = {o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {"Out-of-band channel config"}}};
530 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
531 if (!rinp.metricChannel.empty()) {
532 LOGP(alarm, "Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
533 LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
534 }
535 }
536 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
537 spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
538 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
539 spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
540 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
541 spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
542
543 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
544
545 return spec;
546}
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID First
Definition DetID.h:94
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID FOC
Definition DetID.h:80
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
size_t size() const
Definition FIFO.h:31
void pop()
Definition FIFO.h:56
const T & front() const
Definition FIFO.h:64
void push(Args &&... args)
Definition FIFO.h:50
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLenum GLsizei dataSize
Definition glcorearb.h:3994
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:601
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
static std::string describe(InputSpec const &spec)
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
RunNumberType runNumber
Definition DataHeader.h:684
Helper struct to encode origin and description of data.
Definition DataHeader.h:757
DataDescription dataDescription
Definition DataHeader.h:759
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::vector< int > tfIDs
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
const int sleepTime
Definition test_Fifo.cxx:28
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"