Project
Loading...
Searching...
No Matches
RawFileReaderWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
22#include "Framework/Task.h"
23#include "Framework/Logger.h"
26
31#include "Headers/DataHeader.h"
33#include "Headers/STFHeader.h"
34#include "Headers/Stack.h"
35
36#include "RawFileReaderWorkflow.h" // not installed
37#include <TStopwatch.h>
38#include <fairmq/Device.h>
39#include <fairmq/Message.h>
40#include <fairmq/Parts.h>
41
42#include <unistd.h>
43#include <algorithm>
44#include <unordered_map>
45#include <cctype>
46#include <string>
47#include <climits>
48#include <regex>
49#include <chrono>
50#include <thread>
51
52using namespace o2::raw;
54
55namespace o2f = o2::framework;
56namespace o2h = o2::header;
57
59{
60 public:
61 explicit RawReaderSpecs(const ReaderInp& rinp);
62 void init(o2f::InitContext& ic) final;
63 void run(o2f::ProcessingContext& ctx) final;
64
65 uint32_t getMinTFID() const { return mMinTFID; }
66 uint32_t getMaxTFID() const { return mMaxTFID; }
67 void setMinMaxTFID(uint32_t mn, uint32_t mx)
68 {
69 mMinTFID = mn;
70 mMaxTFID = mx >= mn ? mx : mn;
71 }
72
73 private:
74 void processDropTF(const std::string& drops);
75
76 int mLoop = 0; // once last TF reached, loop while mLoop>=0
77 uint32_t mTFCounter = 0; // TFId accumulator (accounts for looping)
78 uint32_t mDelayUSec = 0; // Delay in microseconds between TFs
79 uint32_t mMinTFID = 0; // 1st TF to extract
80 uint32_t mMaxTFID = 0xffffffff; // last TF to extrct
81 int mRunNumber = 0; // run number to pass
82 int mVerbosity = 0;
83 int mTFRateLimit = -999;
84 bool mPreferCalcTF = false;
85 size_t mMinSHM = 0;
86 size_t mLoopsDone = 0;
87 size_t mSentSize = 0;
88 size_t mSentMessages = 0;
89 bool mPartPerSP = true; // fill part per superpage
90 bool mSup0xccdb = false; // suppress explicit FLP/DISTSUBTIMEFRAME/0xccdb output
91 std::string mRawChannelName = ""; // name of optional non-DPL channel
92 std::unique_ptr<o2::raw::RawFileReader> mReader; // matching engine
93 std::unordered_map<std::string, std::pair<int, int>> mDropTFMap; // allows to drop certain fraction of TFs
94 TStopwatch mTimer;
95};
96
97//___________________________________________________________
99 : mLoop(rinp.loop < 0 ? INT_MAX : (rinp.loop < 1 ? 1 : rinp.loop)), mDelayUSec(rinp.delay_us), mMinTFID(rinp.minTF), mMaxTFID(rinp.maxTF), mRunNumber(rinp.runNumber), mPartPerSP(rinp.partPerSP), mSup0xccdb(rinp.sup0xccdb), mReader(std::make_unique<o2::raw::RawFileReader>(rinp.inifile, 0, rinp.bufferSize, rinp.onlyDet)), mRawChannelName(rinp.rawChannelConfig), mPreferCalcTF(rinp.preferCalcTF), mMinSHM(rinp.minSHM)
100{
101 mReader->setCheckErrors(rinp.errMap);
102 mReader->setMaxTFToRead(rinp.maxTF);
103 mReader->setNominalSPageSize(rinp.spSize);
104 mReader->setCacheData(rinp.cache);
105 mReader->setTFAutodetect(rinp.autodetectTF0 ? RawFileReader::FirstTFDetection::Pending : RawFileReader::FirstTFDetection::Disabled);
106 mReader->setPreferCalculatedTFStart(rinp.preferCalcTF);
107 LOG(info) << "Will preprocess files with buffer size of " << rinp.bufferSize << " bytes";
108 LOG(info) << "Number of loops over whole data requested: " << mLoop;
109 mTimer.Stop();
110 mTimer.Reset();
111 processDropTF(rinp.dropTF);
112}
113
114//___________________________________________________________
115void RawReaderSpecs::processDropTF(const std::string& dropTF)
116{
117 static const std::regex delimDet(";");
118 if (dropTF.empty() || dropTF == "none") {
119 return;
120 }
121 std::sregex_token_iterator iter(dropTF.begin(), dropTF.end(), delimDet, -1), end;
122 for (; iter != end; ++iter) {
123 std::string sdet = iter->str();
124 if (sdet.length() < 5 || sdet[3] != ',') {
125 throw std::runtime_error(fmt::format("Wrong dropTF argument {} in {}", sdet, dropTF));
126 }
127 std::string detName = sdet.substr(0, 3);
128 o2::detectors::DetID det(detName.c_str()); // make sure this is a valid detector
129 std::string sdetArg = sdet.substr(4, sdet.length());
130 int modV = 0, rej = 0, posrej = sdetArg.find(',');
131 if (posrej != std::string::npos) {
132 modV = std::stoi(sdetArg.substr(0, posrej));
133 rej = std::stoi(sdetArg.substr(++posrej, sdetArg.length()));
134 } else {
135 modV = std::stoi(sdetArg);
136 }
137 if (modV < 1 || rej < 0 || rej >= modV) {
138 throw std::runtime_error(fmt::format("Wrong dropTF argument {}, 1st number must be > than 2nd", sdet));
139 }
140 mDropTFMap[detName] = {modV, rej};
141 LOG(info) << " Will drop TF for detector " << detName << " if (TF_ID%" << modV << ")==" << rej;
142 }
143}
144
145//___________________________________________________________
147{
148 assert(mReader);
149 mTimer.Start();
150 mTimer.Stop();
151 mVerbosity = ic.options().get<int>("verbosity-level");
152 mReader->setVerbosity(mVerbosity);
153 mReader->init();
154 if (mMaxTFID >= mReader->getNTimeFrames()) {
155 mMaxTFID = mReader->getNTimeFrames() ? mReader->getNTimeFrames() - 1 : 0;
156 }
157 const auto& hbfU = HBFUtils::Instance();
158 if (!hbfU.startTime) {
159 hbfU.setValue("HBFUtils.startTime", std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()));
160 LOG(warning) << "Run start time is not provided via HBFUtils.startTime, will use now() = " << hbfU.startTime << " ms.";
161 }
162 if (mRunNumber == 0 && hbfU.runNumber > 0) {
163 mRunNumber = hbfU.runNumber;
164 }
165}
166
167//___________________________________________________________
169{
170 assert(mReader);
171 auto tTotStart = mTimer.CpuTime();
172 mTimer.Start(false);
173 auto device = ctx.services().get<o2f::RawDeviceService>().device();
174 assert(device);
175 if (mTFRateLimit == -999) {
176 mTFRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
177 }
178 auto findOutputChannel = [&ctx, this](o2h::DataHeader& h) {
179 if (!this->mRawChannelName.empty()) {
180 return std::string{this->mRawChannelName};
181 } else {
182 auto outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
183 for (auto& oroute : outputRoutes) {
184 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
185 if (o2f::DataSpecUtils::match(oroute.matcher, h.dataOrigin, h.dataDescription, h.subSpecification) && ((mTFCounter % oroute.maxTimeslices) == oroute.timeslice)) {
186 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
187 return std::string{oroute.channel};
188 }
189 }
190 }
191 LOGP(error, "Failed to find output channel for {}/{}/{} @ timeslice {}", h.dataOrigin, h.dataDescription, h.subSpecification, h.tfCounter);
192 auto outputRoutes = ctx.services().get<o2f::RawDeviceService>().spec().outputs;
193 for (auto& oroute : outputRoutes) {
194 LOGP(info, "Available output routes: {} channel: {}", o2f::DataSpecUtils::describe(oroute.matcher), oroute.channel);
195 }
196 return std::string{};
197 };
198
199 size_t tfNParts = 0, tfSize = 0;
200 std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>> messagesPerRoute;
201
202 auto addPart = [&messagesPerRoute, &tfNParts, &tfSize](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl, const std::string& fairMQChannel) {
203 fair::mq::Parts* parts = nullptr;
204 parts = messagesPerRoute[fairMQChannel].get(); // fair::mq::Parts*
205 if (!parts) {
206 messagesPerRoute[fairMQChannel] = std::make_unique<fair::mq::Parts>();
207 parts = messagesPerRoute[fairMQChannel].get();
208 }
209 tfSize += pl->GetSize();
210 tfNParts++;
211 parts->AddPart(std::move(hd));
212 parts->AddPart(std::move(pl));
213 };
214
215 // clean-up before reading next TF
216 auto tfID = mReader->getNextTFToRead();
217 int nlinks = mReader->getNLinks();
218
219 if (tfID < mMinTFID) {
220 tfID = mMinTFID;
221 }
222 mReader->setNextTFToRead(tfID);
223 std::vector<RawFileReader::PartStat> partsSP;
224
225 static o2f::RateLimiter limiter;
226 limiter.check(ctx, mTFRateLimit, mMinSHM);
227
228 // read next time frame
229 LOG(info) << "Reading TF#" << mTFCounter << " (" << tfID << " at iteration " << mLoopsDone << ')';
230 o2::header::Stack dummyStack{o2h::DataHeader{}, o2f::DataProcessingHeader{0}}; // dummy stack to just to get stack size
231 auto hstackSize = dummyStack.size();
232
233 uint32_t firstOrbit = 0;
234 uint64_t creationTime = 0;
235 const auto& hbfU = HBFUtils::Instance();
236
237 for (int il = 0; il < nlinks; il++) {
238 auto& link = mReader->getLink(il);
239
240 if (!mDropTFMap.empty()) { // some TFs should be dropped
241 auto res = mDropTFMap.find(link.origin.str);
242 if (res != mDropTFMap.end() && (mTFCounter % res->second.first) == res->second.second) {
243 LOG(info) << "Dropping " << mTFCounter << " for " << link.origin.str << "/" << link.description.str << "/" << link.subspec;
244 continue; // drop the data
245 }
246 }
247 if (!link.rewindToTF(tfID)) {
248 continue; // this link has no data for wanted TF
249 }
250
251 o2h::DataHeader hdrTmpl(link.description, link.origin, link.subspec); // template with 0 size
252 int nParts = mPartPerSP ? link.getNextTFSuperPagesStat(partsSP) : link.getNHBFinTF();
254 hdrTmpl.splitPayloadParts = nParts;
255 hdrTmpl.tfCounter = mTFCounter;
256 hdrTmpl.runNumber = mRunNumber;
257 if (mVerbosity > 1) {
258 LOG(info) << link.describe() << " will read " << nParts << " HBFs starting from block " << link.nextBlock2Read;
259 }
260 const auto fmqChannel = findOutputChannel(hdrTmpl);
261 if (fmqChannel.empty()) { // no output channel
262 continue;
263 }
264
265 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
266 while (hdrTmpl.splitPayloadIndex < hdrTmpl.splitPayloadParts) {
267 hdrTmpl.payloadSize = mPartPerSP ? partsSP[hdrTmpl.splitPayloadIndex].size : link.getNextHBFSize();
268 auto hdMessage = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
269 auto plMessage = fmqFactory->CreateMessage(hdrTmpl.payloadSize, fair::mq::Alignment{64});
270 auto bread = mPartPerSP ? link.readNextSuperPage(reinterpret_cast<char*>(plMessage->GetData()), &partsSP[hdrTmpl.splitPayloadIndex]) : link.readNextHBF(reinterpret_cast<char*>(plMessage->GetData()));
271 if (bread != hdrTmpl.payloadSize) {
272 LOG(error) << "Link " << il << " read " << bread << " bytes instead of " << hdrTmpl.payloadSize
273 << " expected in TF=" << mTFCounter << " part=" << hdrTmpl.splitPayloadIndex;
274 }
275 // check if the RDH to send corresponds to expected orbit
276 if (hdrTmpl.splitPayloadIndex == 0) {
277 auto ir = o2::raw::RDHUtils::getHeartBeatIR(plMessage->GetData());
278 auto tfid = hbfU.getTF(ir);
279 firstOrbit = hdrTmpl.firstTForbit = (mPreferCalcTF || !link.cruDetector) ? hbfU.getIRTF(tfid).orbit : ir.orbit; // will be picked for the following parts
280 creationTime = hbfU.getTFTimeStamp({0, firstOrbit});
281 }
282 o2::header::Stack headerStack{hdrTmpl, o2f::DataProcessingHeader{mTFCounter, 1, creationTime}};
283 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
284 hdrTmpl.splitPayloadIndex++; // prepare for next
285
286 addPart(std::move(hdMessage), std::move(plMessage), fmqChannel);
287 }
288 LOGF(debug, "Added %d parts for TF#%d(%d in iteration %d) of %s/%s/0x%u", hdrTmpl.splitPayloadParts, mTFCounter, tfID,
289 mLoopsDone, link.origin.as<std::string>(), link.description.as<std::string>(), link.subspec);
290 }
291
292 auto& timingInfo = ctx.services().get<o2f::TimingInfo>();
293 timingInfo.firstTForbit = firstOrbit;
294 timingInfo.creation = creationTime;
295 timingInfo.tfCounter = mTFCounter;
296 timingInfo.runNumber = mRunNumber;
297
298 // send sTF acknowledge message
299 unsigned stfSS[2] = {0, 0xccdb};
300 for (int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
301 o2::header::STFHeader stfHeader{mTFCounter, firstOrbit, 0};
303 stfDistDataHeader.runNumber = mRunNumber;
305 stfDistDataHeader.firstTForbit = stfHeader.firstOrbit;
306 stfDistDataHeader.tfCounter = mTFCounter;
307 const auto fmqChannel = findOutputChannel(stfDistDataHeader);
308 if (!fmqChannel.empty()) { // no output channel
309 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
310 o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{mTFCounter, 1, creationTime}};
311 auto hdMessageSTF = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
312 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.payloadSize, fair::mq::Alignment{64});
313 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
314 memcpy(plMessageSTF->GetData(), &stfHeader, sizeof(o2::header::STFHeader));
315 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
316 }
317 }
318
319 if (mTFCounter) { // delay sending
320 std::this_thread::sleep_for(std::chrono::microseconds((size_t)mDelayUSec));
321 }
322 bool sentSomething = false;
323 for (auto& msgIt : messagesPerRoute) {
324 LOG(info) << "Sending " << msgIt.second->Size() / 2 << " parts to channel " << msgIt.first;
325 device->Send(*msgIt.second.get(), msgIt.first);
326 sentSomething = msgIt.second->Size() > 0;
327 }
328 if (sentSomething) {
329 ctx.services().get<o2f::MessageContext>().fakeDispatch();
330 }
331
332 mTimer.Stop();
333
334 LOGP(info, "Sent payload of {} bytes in {} parts in {} messages for TF#{} firstTForbit={} timeStamp={} | Timing: {}", tfSize, tfNParts,
335 messagesPerRoute.size(), mTFCounter, firstOrbit, creationTime, mTimer.CpuTime() - tTotStart);
336
337 mSentSize += tfSize;
338 mSentMessages += tfNParts;
339 mReader->setNextTFToRead(++tfID);
340 ++mTFCounter;
341 if (tfID > mMaxTFID || mReader->isProcessingStopped()) {
342 if (!mReader->isProcessingStopped() && !mReader->isEmpty() && --mLoop) {
343 mLoopsDone++;
344 mReader->setNextTFToRead(tfID = 0);
345 LOG(info) << "Shall start new loop " << mLoopsDone << " from the beginning of data";
346 } else {
347 if (!mRawChannelName.empty()) { // send endOfStream message to raw channel
348 o2f::SourceInfoHeader exitHdr;
349 exitHdr.state = o2f::InputChannelState::Completed;
351 auto fmqFactory = device->GetChannel(mRawChannelName, 0).Transport();
352 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
353 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
354 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
355 fair::mq::Parts eosMsg;
356 eosMsg.AddPart(std::move(hdEOSMessage));
357 eosMsg.AddPart(std::move(plEOSMessage));
358 device->Send(eosMsg, mRawChannelName);
359 LOG(info) << "Sent EoS message to " << mRawChannelName;
360 } else {
362 }
363 ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
364 mTimer.Stop();
365 LOGP(info, "Finished: payload of {} bytes in {} messages sent for {} TFs, total timing: Real:{:3f}/CPU:{:3f}", mSentSize, mSentMessages, mTFCounter, mTimer.RealTime(), mTimer.CpuTime());
366 }
367 }
368}
369
370//_________________________________________________________
372{
373 // check which inputs are present in files to read
375 spec.name = "raw-file-reader";
376 std::string rawChannelName = "";
377 if (rinp.rawChannelConfig.empty()) {
378 if (!rinp.inifile.empty()) {
379 auto conf = o2::raw::RawFileReader::parseInput(rinp.inifile, rinp.onlyDet);
380 for (const auto& entry : conf) {
381 const auto& ordescard = entry.first;
382 if (!entry.second.empty()) { // origin and decription for files to process
383 spec.outputs.emplace_back(o2f::OutputSpec(o2f::ConcreteDataTypeMatcher{std::get<0>(ordescard), std::get<1>(ordescard)}));
384 }
385 }
386 }
387 // add output for DISTSUBTIMEFRAME
389 if (!rinp.sup0xccdb) {
390 spec.outputs.emplace_back(o2f::OutputSpec{{"stfDistCCDB"}, o2::header::gDataOriginFLP, o2::header::gDataDescriptionDISTSTF, 0xccdb}); // will be added automatically
391 }
392 if (!rinp.metricChannel.empty()) {
393 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.metricChannel, {"Out-of-band channel config for TF throttling"}});
394 }
395 } else {
396 auto nameStart = rinp.rawChannelConfig.find("name=");
397 if (nameStart == std::string::npos) {
398 throw std::runtime_error("raw channel name is not provided");
399 }
400 nameStart += strlen("name=");
401 auto nameEnd = rinp.rawChannelConfig.find(",", nameStart + 1);
402 if (nameEnd == std::string::npos) {
403 nameEnd = rinp.rawChannelConfig.size();
404 }
405 spec.options.emplace_back(o2f::ConfigParamSpec{"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {"Out-of-band channel config"}});
406 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
407 if (!rinp.metricChannel.empty()) {
408 LOGP(alarm, "Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
409 LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
410 }
411 LOG(info) << "Will send output to non-DPL channel " << rinp.rawChannelConfig;
412 }
413
414 spec.algorithm = o2f::adaptFromTask<RawReaderSpecs>(rinp);
415 spec.options.emplace_back(o2f::ConfigParamSpec{"verbosity-level", o2f::VariantType::Int, 0, {"verbosity level"}});
416 return spec;
417}
418
420{
421 o2f::WorkflowSpec specs;
422 specs.emplace_back(getReaderSpec(rinp));
423 return specs;
424}
uint32_t res
Definition RawData.h:0
o2f::DataProcessorSpec getReaderSpec(ReaderInp rinp)
Reader for (multiple) raw data files.
std::ostringstream debug
Class for time synchronization of RawReader instances.
void run(o2f::ProcessingContext &ctx) final
void setMinMaxTFID(uint32_t mn, uint32_t mx)
RawReaderSpecs(const ReaderInp &rinp)
uint32_t getMaxTFID() const
void init(o2f::InitContext &ic) final
uint32_t getMinTFID() const
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
static InputsMap parseInput(const std::string &confUri, const std::string &onlyDet={}, bool verbose=false)
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLuint GLuint end
Definition glcorearb.h:469
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:601
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
framework::WorkflowSpec getRawFileReaderWorkflow(ReaderInp &rinp)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
uint32_t orbit
LHC orbit.
a BaseHeader with state information from the source
constexpr uint32_t size() const noexcept
Definition DataHeader.h:421
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
PayloadSizeType payloadSize
Definition DataHeader.h:666
RunNumberType runNumber
Definition DataHeader.h:684
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
std::string rawChannelConfig
std::string inifile
std::string onlyDet
std::string metricChannel
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir(0, 0)