28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
44#include <unordered_map>
54using namespace std::chrono_literals;
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>;
75 void loadRunTimeSpans(
const std::string& flname);
76 void runTimeRangesToIRFrameSelector(
int runNumber);
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
89 int mConvRunTimeRangesToOrbits = -1;
90 int mSentTFCounter = 0;
91 int mAccTFCounter = 0;
92 int mTFBuilderCounter = 0;
95 long mTotalWaitTime = 0;
96 size_t mSelIDEntry = 0;
97 bool mRunning =
false;
98 bool mWaitSendingLast =
false;
100 std::thread mTFBuilderThread{};
106 for (
const auto& hd : rinp.
hdVec) {
114 mInput.
tfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-tf-ids"));
129 mFileFetcher->setMaxLoops(mInput.
maxLoops);
130 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
131 mFileFetcher->start();
142 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder,
this);
144 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
147 if (device != mDevice) {
148 throw std::runtime_error(fmt::format(
"FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
151 mInput.
tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
153 auto acknowledgeOutput = [
this](fair::mq::Parts& parts,
bool verbose =
false) {
154 int np = parts.Size();
155 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
157 for (
int ip = 0; ip < np; ip += 2) {
158 const auto& msgh = parts[ip];
159 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
160 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
162 LOGP(info,
"Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
164 if (hd->splitPayloadIndex == 0) {
165 auto&
entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
166 if (
entry.count != this->mSentTFCounter) {
167 if (verbose && hdPrev) {
172 entry.count = this->mSentTFCounter;
173 LOG(
debug) <<
"Found a part " << ip <<
" of " << np <<
" | " << hd->dataOrigin.as<std::string>() <<
"/" << hd->dataDescription.as<std::string>()
174 <<
"/" << hd->subSpecification <<
" part " << hd->splitPayloadIndex <<
" of " << hd->splitPayloadParts <<
" for TF " << this->mSentTFCounter;
179 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
183 if (verbose && hdPrev) {
194 for (
auto& oroute : outputRoutes) {
195 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
196 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
197 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
198 return std::string{oroute.channel};
203 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
204 for (
auto& oroute : outputRoutes) {
205 LOGP(info,
"Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
207 return std::string{};
209 auto setTimingInfo = [&ctx](
TFMap& msgMap) {
211 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
212 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
213 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
214 timingInfo.firstTForbit = hd0->firstTForbit;
215 timingInfo.creation = dph->creation;
216 timingInfo.tfCounter = hd0->tfCounter;
217 timingInfo.runNumber = hd0->runNumber;
220 auto addMissingParts = [
this, &findOutputChannel](
TFMap& msgMap) {
222 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
223 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
224 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
225 for (
auto& out : this->mSeenOutputMap) {
226 if (out.second.count == this->mSentTFCounter) {
229 LOG(
debug) <<
"Adding dummy output for " << out.first.dataOrigin.as<std::string>() <<
"/" << out.first.dataDescription.as<std::string>()
230 <<
"/" << out.second.defSubSpec <<
" for TF " << this->mSentTFCounter;
231 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
236 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
237 if (fmqChannel.empty()) {
240 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
242 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
243 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
244 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
245 fair::mq::Parts* parts = msgMap[fmqChannel].get();
247 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
248 parts = msgMap[fmqChannel].get();
250 parts->AddPart(std::move(hdMessage));
251 parts->AddPart(std::move(plMessage));
256 if (mTFQueue.
size()) {
260 auto tfPtr = std::move(mTFQueue.
front());
263 LOG(error) <<
"Builder provided nullptr TF pointer";
266 setTimingInfo(*tfPtr.get());
270 for (
auto& msgIt : *tfPtr.get()) {
272 LOGP(info,
"acknowledgeOutput {}", cntAck++);
274 acknowledgeOutput(*msgIt.second.get(),
true);
276 addMissingParts(*tfPtr.get());
279 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
280 auto tDiff = tNow - tLastTF;
281 if (mSentTFCounter && tDiff < mInput.
delay_us) {
282 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)(mInput.
delay_us - tDiff)));
284 for (
auto& msgIt : *tfPtr.get()) {
285 size_t szPart = acknowledgeOutput(*msgIt.second.get(),
false);
287 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
288 nparts += msgIt.second->Size() / 2;
289 device->Send(*msgIt.second.get(), msgIt.first);
295 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
296 LOGP(info,
"Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter,
dataSize, nparts, mSentTFCounter ?
double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
300 while (mTFQueue.
size() == 0 && mWaitSendingLast) {
311 if (mSentTFCounter >= mInput.
maxTFs || (!mTFQueue.
size() && !mRunning)) {
320 mFileFetcher->stop();
321 mFileFetcher.reset();
323 if (mTFBuilderThread.joinable()) {
324 mTFBuilderThread.join();
331 static bool stopDone =
false;
336 LOGP(info,
"{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
339 mFileFetcher->stop();
340 mFileFetcher.reset();
342 if (mTFBuilderThread.joinable()) {
343 mTFBuilderThread.join();
348 exitHdr.
state = o2f::InputChannelState::Completed;
351 dh.
runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
355 auto fmqFactory = device->GetChannel(mInput.
rawChannelConfig, 0).Transport();
356 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
357 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
358 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
359 fair::mq::Parts eosMsg;
360 eosMsg.AddPart(std::move(hdEOSMessage));
361 eosMsg.AddPart(std::move(plEOSMessage));
371void TFReaderSpec::TFBuilder()
374 std::string tfFileName;
376 bool waitAcknowledged =
false;
378 while (mRunning && mDevice) {
379 LOGP(
debug,
"mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.
size(), mWaitSendingLast);
381 mWaitSendingLast =
false;
385 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() :
"";
387 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
388 mTFBuilderCounter >= mInput.
maxTFs ||
389 (!mInput.
tfIDs.empty() && mSelIDEntry >= mInput.
tfIDs.size())) {
391 LOG(info) <<
"TFReader stops processing";
393 mFileFetcher->stop();
396 mWaitSendingLast =
false;
399 if (tfFileName.empty()) {
400 if (!waitAcknowledged) {
401 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
402 waitAcknowledged =
true;
404 std::this_thread::sleep_for(10ms);
407 mWaitSendingLast =
false;
408 if (waitAcknowledged) {
409 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
410 mTotalWaitTime += waitTime;
412 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
414 waitAcknowledged =
false;
418 LOG(info) <<
"Processing file " << tfFileName;
423 while (mRunning && mTFBuilderCounter < mInput.
maxTFs) {
425 if (mTFQueue.
size() > 1) {
426 mWaitSendingLast =
false;
431 auto tf = reader.read(mDevice, mOutputRoutes, mInput.
rawChannelConfig, mAccTFCounter);
432 bool acceptTF =
true;
434 if (mRunTimeRanges.size()) {
435 const auto* dataptr = (*
tf->begin()->second.get())[0].GetData();
436 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
437 static int runNumberPrev = -1;
438 if (runNumberPrev != hd0->runNumber) {
439 runNumberPrev = hd0->runNumber;
440 runTimeRangesToIRFrameSelector(runNumberPrev);
442 if (mIRFrameSelector.
isSet()) {
447 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
448 irSpan.size(),
ir0.
asString(), ir1.asString(), acceptTF ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
452 if (!mInput.
tfIDs.empty() && acceptTF) {
454 while ((mInput.
tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.
tfIDs.size()) {
457 LOGP(info,
"chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.
tfIDs[mSelIDEntry], mTFBuilderCounter);
458 if (mInput.
tfIDs[mSelIDEntry] == mTFBuilderCounter) {
459 mWaitSendingLast =
false;
461 LOGP(info,
"Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
463 LOGP(info,
"Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
468 if (mRunning &&
tf) {
471 mWaitSendingLast =
true;
472 mTFQueue.
push(std::move(
tf));
483 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.
maxLoops);
490void TFReaderSpec::loadRunTimeSpans(
const std::string& flname)
492 std::ifstream inputFile(flname);
494 LOGP(fatal,
"Failed to open selected run/timespans file {}", flname);
497 size_t cntl = 0, cntr = 0;
498 while (std::getline(inputFile, line)) {
500 for (
char& ch :
line) {
501 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
506 if (
line.size() < 1 || line[0] ==
'#') {
510 auto logError = [&cntl, &
line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
511 if (tokens.size() >= 3) {
515 run = std::stoi(tokens[0]);
516 rmin = std::stol(tokens[1]);
517 rmax = std::stol(tokens[2]);
523 constexpr long ISTimeStamp = 1514761200000L;
524 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
526 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
528 if (mConvRunTimeRangesToOrbits == -1) {
529 if (convmn != convmx) {
530 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
532 mConvRunTimeRangesToOrbits = convmn;
533 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
535 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
536 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits",
line);
540 mRunTimeRanges[
run].emplace_back(rmin, rmax);
546 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
551void TFReaderSpec::runTimeRangesToIRFrameSelector(
int runNumber)
554 mIRFrameSelector.
clear();
555 auto ent = mRunTimeRanges.find(runNumber);
556 if (ent == mRunTimeRanges.end()) {
557 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
564 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", runNumber);
567 std::vector<o2::dataformats::IRFrame> frames;
568 for (
const auto& rng : ent->second) {
569 long orbMin = 0, orbMax = 0;
570 if (mConvRunTimeRangesToOrbits > 0) {
583 if (runNumber > 523897) {
587 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
598 spec.
name =
"tf-reader";
599 const DetID::mask_t DEFMask =
DetID::getMask(
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
663 if (!rinp.metricChannel.empty()) {
664 spec.options.emplace_back(
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.metricChannel, {
"Out-of-band channel config for TF throttling"}});
667 auto nameStart = rinp.rawChannelConfig.find(
"name=");
668 if (nameStart == std::string::npos) {
669 throw std::runtime_error(
"raw channel name is not provided");
671 nameStart += strlen(
"name=");
672 auto nameEnd = rinp.rawChannelConfig.find(
",", nameStart + 1);
673 if (nameEnd == std::string::npos) {
674 nameEnd = rinp.rawChannelConfig.size();
676 spec.options = {
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {
"Out-of-band channel config"}}};
677 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
678 if (!rinp.metricChannel.empty()) {
679 LOGP(alarm,
"Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
680 LOGP(alarm, R
"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
683 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String,
"", {
"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
684 spec.options.emplace_back(
o2f::ConfigParamSpec{
"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {
"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
685 spec.options.emplace_back(
o2f::ConfigParamSpec{
"ignore-repair-headers", o2f::VariantType::Bool,
false, {
"do not check/repair headers"}});
686 spec.options.emplace_back(
o2f::ConfigParamSpec{
"read-dist-stf", o2f::VariantType::Bool,
false, {
"do not ignore stored FLP/DISTSUBTIMEFRAME (will clash with injected one)"}});
687 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf", o2f::VariantType::Int, -1, {
"max TF ID to process (<= 0 : infinite)"}});
688 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf-per-file", o2f::VariantType::Int, -1, {
"max TFs to process per raw-tf file (<= 0 : infinite)"}});
689 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-tf", o2f::VariantType::Int, 3, {
"max TFs to cache in memory"}});
690 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-files", o2f::VariantType::Int, 3, {
"max TF files queued (copied for remote source)"}});
692 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
void push(Args &&... args)
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionInfo
constexpr o2::header::DataOrigin gDataOriginAny
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining ITS Vertex explicitly as messageable.
int32_t const char int32_t line
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec, const o2::parameters::GRPLHCIFData *grplhcif=nullptr)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
bool invertIRFramesSelection
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)