171 auto tTotStart = mTimer.CpuTime();
175 if (mTFRateLimit == -999) {
176 mTFRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
179 if (!this->mRawChannelName.empty()) {
180 return std::string{this->mRawChannelName};
183 for (
auto& oroute : outputRoutes) {
184 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
185 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((mTFCounter % oroute.maxTimeslices) == oroute.timeslice)) {
186 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
187 return std::string{oroute.channel};
191 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
193 for (
auto& oroute : outputRoutes) {
194 LOGP(info,
"Available output routes: {} channel: {}", o2f::DataSpecUtils::describe(oroute.matcher), oroute.channel);
196 return std::string{};
199 size_t tfNParts = 0, tfSize = 0;
200 std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>> messagesPerRoute;
202 auto addPart = [&messagesPerRoute, &tfNParts, &tfSize](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl,
const std::string& fairMQChannel) {
203 fair::mq::Parts* parts =
nullptr;
204 parts = messagesPerRoute[fairMQChannel].get();
206 messagesPerRoute[fairMQChannel] = std::make_unique<fair::mq::Parts>();
207 parts = messagesPerRoute[fairMQChannel].get();
209 tfSize += pl->GetSize();
211 parts->AddPart(std::move(hd));
212 parts->AddPart(std::move(pl));
216 auto tfID = mReader->getNextTFToRead();
217 int nlinks = mReader->getNLinks();
219 if (tfID < mMinTFID) {
222 mReader->setNextTFToRead(tfID);
223 std::vector<RawFileReader::PartStat> partsSP;
226 limiter.
check(ctx, mTFRateLimit, mMinSHM);
229 LOG(info) <<
"Reading TF#" << mTFCounter <<
" (" << tfID <<
" at iteration " << mLoopsDone <<
')';
231 auto hstackSize = dummyStack.
size();
233 uint32_t firstOrbit = 0;
234 uint64_t creationTime = 0;
235 const auto& hbfU = HBFUtils::Instance();
237 for (
int il = 0; il < nlinks; il++) {
238 auto& link = mReader->getLink(il);
240 if (!mDropTFMap.empty()) {
241 auto res = mDropTFMap.find(link.origin.str);
242 if (
res != mDropTFMap.end() && (mTFCounter %
res->second.first) ==
res->second.second) {
243 LOG(info) <<
"Dropping " << mTFCounter <<
" for " << link.origin.str <<
"/" << link.description.str <<
"/" << link.subspec;
247 if (!link.rewindToTF(tfID)) {
252 int nParts = mPartPerSP ? link.getNextTFSuperPagesStat(partsSP) : link.getNHBFinTF();
257 if (mVerbosity > 1) {
258 LOG(info) << link.describe() <<
" will read " << nParts <<
" HBFs starting from block " << link.nextBlock2Read;
260 const auto fmqChannel = findOutputChannel(hdrTmpl);
261 if (fmqChannel.empty()) {
265 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
268 auto hdMessage = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
269 auto plMessage = fmqFactory->CreateMessage(hdrTmpl.
payloadSize, fair::mq::Alignment{64});
270 auto bread = mPartPerSP ? link.readNextSuperPage(
reinterpret_cast<char*
>(plMessage->GetData()), &partsSP[hdrTmpl.
splitPayloadIndex]) : link.readNextHBF(
reinterpret_cast<char*
>(plMessage->GetData()));
272 LOG(error) <<
"Link " << il <<
" read " << bread <<
" bytes instead of " << hdrTmpl.
payloadSize
277 auto ir = o2::raw::RDHUtils::getHeartBeatIR(plMessage->GetData());
278 auto tfid = hbfU.getTF(
ir);
279 firstOrbit = hdrTmpl.
firstTForbit = (mPreferCalcTF || !link.cruDetector) ? hbfU.getIRTF(tfid).orbit :
ir.
orbit;
280 creationTime = hbfU.getTFTimeStamp({0, firstOrbit});
283 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
286 addPart(std::move(hdMessage), std::move(plMessage), fmqChannel);
288 LOGF(
debug,
"Added %d parts for TF#%d(%d in iteration %d) of %s/%s/0x%u", hdrTmpl.
splitPayloadParts, mTFCounter, tfID,
289 mLoopsDone, link.origin.as<std::string>(), link.description.as<std::string>(), link.subspec);
294 timingInfo.creation = creationTime;
295 timingInfo.tfCounter = mTFCounter;
296 timingInfo.runNumber = mRunNumber;
299 unsigned stfSS[2] = {0, 0xccdb};
300 for (
int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
303 stfDistDataHeader.
runNumber = mRunNumber;
306 stfDistDataHeader.
tfCounter = mTFCounter;
307 const auto fmqChannel = findOutputChannel(stfDistDataHeader);
308 if (!fmqChannel.empty()) {
309 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
311 auto hdMessageSTF = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
312 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.
payloadSize, fair::mq::Alignment{64});
313 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
315 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
320 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)mDelayUSec));
322 bool sentSomething =
false;
323 for (
auto& msgIt : messagesPerRoute) {
324 LOG(info) <<
"Sending " << msgIt.second->Size() / 2 <<
" parts to channel " << msgIt.first;
325 device->Send(*msgIt.second.get(), msgIt.first);
326 sentSomething = msgIt.second->Size() > 0;
334 LOGP(info,
"Sent payload of {} bytes in {} parts in {} messages for TF#{} firstTForbit={} timeStamp={} | Timing: {}", tfSize, tfNParts,
335 messagesPerRoute.size(), mTFCounter, firstOrbit, creationTime, mTimer.CpuTime() - tTotStart);
338 mSentMessages += tfNParts;
339 mReader->setNextTFToRead(++tfID);
341 if (tfID > mMaxTFID || mReader->isProcessingStopped()) {
342 if (!mReader->isProcessingStopped() && !mReader->isEmpty() && --mLoop) {
344 mReader->setNextTFToRead(tfID = 0);
345 LOG(info) <<
"Shall start new loop " << mLoopsDone <<
" from the beginning of data";
347 if (!mRawChannelName.empty()) {
349 exitHdr.
state = o2f::InputChannelState::Completed;
351 auto fmqFactory = device->GetChannel(mRawChannelName, 0).Transport();
352 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
353 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
354 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
355 fair::mq::Parts eosMsg;
356 eosMsg.AddPart(std::move(hdEOSMessage));
357 eosMsg.AddPart(std::move(plEOSMessage));
358 device->Send(eosMsg, mRawChannelName);
359 LOG(info) <<
"Sent EoS message to " << mRawChannelName;
365 LOGP(info,
"Finished: payload of {} bytes in {} messages sent for {} TFs, total timing: Real:{:3f}/CPU:{:3f}", mSentSize, mSentMessages, mTFCounter, mTimer.RealTime(), mTimer.CpuTime());