172 auto tTotStart = mTimer.CpuTime();
176 if (mTFRateLimit == -999) {
177 mTFRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
180 if (!this->mRawChannelName.empty()) {
181 return std::string{this->mRawChannelName};
184 for (
auto& oroute : outputRoutes) {
185 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
186 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((mTFCounter % oroute.maxTimeslices) == oroute.timeslice)) {
187 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
188 return std::string{oroute.channel};
192 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
194 for (
auto& oroute : outputRoutes) {
195 LOGP(info,
"Available output routes: {} channel: {}", o2f::DataSpecUtils::describe(oroute.matcher), oroute.channel);
197 return std::string{};
200 size_t tfNParts = 0, tfSize = 0;
201 std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>> messagesPerRoute;
203 auto addPart = [&messagesPerRoute, &tfNParts, &tfSize](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl,
const std::string& fairMQChannel) {
204 fair::mq::Parts* parts =
nullptr;
205 parts = messagesPerRoute[fairMQChannel].get();
207 messagesPerRoute[fairMQChannel] = std::make_unique<fair::mq::Parts>();
208 parts = messagesPerRoute[fairMQChannel].get();
210 tfSize += pl->GetSize();
212 parts->AddPart(std::move(hd));
213 parts->AddPart(std::move(pl));
217 auto tfID = mReader->getNextTFToRead();
218 int nlinks = mReader->getNLinks();
220 if (tfID < mMinTFID) {
223 mReader->setNextTFToRead(tfID);
224 std::vector<RawFileReader::PartStat> partsSP;
227 limiter.
check(ctx, mTFRateLimit, mMinSHM);
230 LOG(info) <<
"Reading TF#" << mTFCounter <<
" (" << tfID <<
" at iteration " << mLoopsDone <<
')';
232 auto hstackSize = dummyStack.
size();
234 uint32_t firstOrbit = 0;
235 uint64_t creationTime = 0;
236 const auto& hbfU = HBFUtils::Instance();
238 for (
int il = 0; il < nlinks; il++) {
239 auto& link = mReader->getLink(il);
241 if (!mDropTFMap.empty()) {
242 auto res = mDropTFMap.find(link.origin.str);
243 if (
res != mDropTFMap.end() && (mTFCounter %
res->second.first) ==
res->second.second) {
244 LOG(info) <<
"Dropping " << mTFCounter <<
" for " << link.origin.str <<
"/" << link.description.str <<
"/" << link.subspec;
248 if (!link.rewindToTF(tfID)) {
253 int nParts = mPartPerSP ? link.getNextTFSuperPagesStat(partsSP) : link.getNHBFinTF();
258 if (mVerbosity > 1) {
259 LOG(info) << link.describe() <<
" will read " << nParts <<
" HBFs starting from block " << link.nextBlock2Read;
261 const auto fmqChannel = findOutputChannel(hdrTmpl);
262 if (fmqChannel.empty()) {
266 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
269 auto hdMessage = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
270 auto plMessage = fmqFactory->CreateMessage(hdrTmpl.
payloadSize, fair::mq::Alignment{64});
271 auto bread = mPartPerSP ? link.readNextSuperPage(
reinterpret_cast<char*
>(plMessage->GetData()), &partsSP[hdrTmpl.
splitPayloadIndex]) : link.readNextHBF(
reinterpret_cast<char*
>(plMessage->GetData()));
273 LOG(error) <<
"Link " << il <<
" read " << bread <<
" bytes instead of " << hdrTmpl.
payloadSize
278 auto ir = o2::raw::RDHUtils::getHeartBeatIR(plMessage->GetData());
279 auto tfid = hbfU.getTF(
ir);
280 firstOrbit = hdrTmpl.
firstTForbit = (mPreferCalcTF || !link.cruDetector) ? hbfU.getIRTF(tfid).orbit :
ir.
orbit;
281 creationTime = hbfU.getTFTimeStamp({0, firstOrbit});
284 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
287 addPart(std::move(hdMessage), std::move(plMessage), fmqChannel);
289 LOGF(
debug,
"Added %d parts for TF#%d(%d in iteration %d) of %s/%s/0x%u", hdrTmpl.
splitPayloadParts, mTFCounter, tfID,
290 mLoopsDone, link.origin.as<std::string>(), link.description.as<std::string>(), link.subspec);
295 timingInfo.creation = creationTime;
296 timingInfo.tfCounter = mTFCounter;
297 timingInfo.runNumber = mRunNumber;
300 unsigned stfSS[2] = {0, 0xccdb};
301 for (
int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
304 stfDistDataHeader.
runNumber = mRunNumber;
307 stfDistDataHeader.
tfCounter = mTFCounter;
308 const auto fmqChannel = findOutputChannel(stfDistDataHeader);
309 if (!fmqChannel.empty()) {
310 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
312 auto hdMessageSTF = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
313 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.
payloadSize, fair::mq::Alignment{64});
314 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
316 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
321 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)mDelayUSec));
323 bool sentSomething =
false;
324 for (
auto& msgIt : messagesPerRoute) {
325 LOG(info) <<
"Sending " << msgIt.second->Size() / 2 <<
" parts to channel " << msgIt.first;
326 device->Send(*msgIt.second.get(), msgIt.first);
327 sentSomething = msgIt.second->Size() > 0;
335 LOGP(info,
"Sent payload of {} bytes in {} parts in {} messages for TF#{} firstTForbit={} timeStamp={} | Timing: {}", tfSize, tfNParts,
336 messagesPerRoute.size(), mTFCounter, firstOrbit, creationTime, mTimer.CpuTime() - tTotStart);
339 mSentMessages += tfNParts;
340 mReader->setNextTFToRead(++tfID);
342 if (tfID > mMaxTFID || mReader->isProcessingStopped()) {
343 if (!mReader->isProcessingStopped() && !mReader->isEmpty() && --mLoop) {
345 mReader->setNextTFToRead(tfID = 0);
346 LOG(info) <<
"Shall start new loop " << mLoopsDone <<
" from the beginning of data";
348 if (!mRawChannelName.empty()) {
350 exitHdr.
state = o2f::InputChannelState::Completed;
353 dh.
runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
357 auto fmqFactory = device->GetChannel(mRawChannelName, 0).Transport();
358 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
359 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
360 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
361 fair::mq::Parts eosMsg;
362 eosMsg.AddPart(std::move(hdEOSMessage));
363 eosMsg.AddPart(std::move(plEOSMessage));
364 device->Send(eosMsg, mRawChannelName);
365 LOG(info) <<
"Sent EoS message to " << mRawChannelName;
371 LOGP(info,
"Finished: payload of {} bytes in {} messages sent for {} TFs, total timing: Real:{:3f}/CPU:{:3f}", mSentSize, mSentMessages, mTFCounter, mTimer.RealTime(), mTimer.CpuTime());