28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
44#include <unordered_map>
54using namespace std::chrono_literals;
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>;
75 void loadRunTimeSpans(
const std::string& flname);
76 void runTimeRangesToIRFrameSelector(
int runNumber);
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
89 int mConvRunTimeRangesToOrbits = -1;
90 int mSentTFCounter = 0;
91 int mAccTFCounter = 0;
92 int mTFBuilderCounter = 0;
95 long mTotalWaitTime = 0;
96 size_t mSelIDEntry = 0;
97 bool mRunning =
false;
98 bool mWaitSendingLast =
false;
100 std::thread mTFBuilderThread{};
106 for (
const auto& hd : rinp.
hdVec) {
114 mInput.
tfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-tf-ids"));
126 mFileFetcher->setMaxLoops(mInput.
maxLoops);
127 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
128 mFileFetcher->start();
139 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder,
this);
141 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
144 if (device != mDevice) {
145 throw std::runtime_error(fmt::format(
"FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
148 mInput.
tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
150 auto acknowledgeOutput = [
this](fair::mq::Parts& parts,
bool verbose =
false) {
151 int np = parts.Size();
152 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
154 for (
int ip = 0; ip < np; ip += 2) {
155 const auto& msgh = parts[ip];
156 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
157 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
159 LOGP(info,
"Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
161 if (hd->splitPayloadIndex == 0) {
162 auto&
entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
163 if (
entry.count != this->mSentTFCounter) {
164 if (verbose && hdPrev) {
169 entry.count = this->mSentTFCounter;
170 LOG(
debug) <<
"Found a part " << ip <<
" of " << np <<
" | " << hd->dataOrigin.as<std::string>() <<
"/" << hd->dataDescription.as<std::string>()
171 <<
"/" << hd->subSpecification <<
" part " << hd->splitPayloadIndex <<
" of " << hd->splitPayloadParts <<
" for TF " << this->mSentTFCounter;
176 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
180 if (verbose && hdPrev) {
191 for (
auto& oroute : outputRoutes) {
192 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
193 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
194 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
195 return std::string{oroute.channel};
200 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
201 for (
auto& oroute : outputRoutes) {
202 LOGP(info,
"Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
204 return std::string{};
206 auto setTimingInfo = [&ctx](
TFMap& msgMap) {
208 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
209 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
210 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
211 timingInfo.firstTForbit = hd0->firstTForbit;
212 timingInfo.creation = dph->creation;
213 timingInfo.tfCounter = hd0->tfCounter;
214 timingInfo.runNumber = hd0->runNumber;
217 auto addMissingParts = [
this, &findOutputChannel](
TFMap& msgMap) {
219 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
220 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
221 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
222 for (
auto& out : this->mSeenOutputMap) {
223 if (out.second.count == this->mSentTFCounter) {
226 LOG(
debug) <<
"Adding dummy output for " << out.first.dataOrigin.as<std::string>() <<
"/" << out.first.dataDescription.as<std::string>()
227 <<
"/" << out.second.defSubSpec <<
" for TF " << this->mSentTFCounter;
228 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
233 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
234 if (fmqChannel.empty()) {
237 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
239 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
240 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
241 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
242 fair::mq::Parts* parts = msgMap[fmqChannel].get();
244 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
245 parts = msgMap[fmqChannel].get();
247 parts->AddPart(std::move(hdMessage));
248 parts->AddPart(std::move(plMessage));
253 if (mTFQueue.
size()) {
257 auto tfPtr = std::move(mTFQueue.
front());
260 LOG(error) <<
"Builder provided nullptr TF pointer";
263 setTimingInfo(*tfPtr.get());
266 for (
auto& msgIt : *tfPtr.get()) {
267 acknowledgeOutput(*msgIt.second.get(),
true);
269 addMissingParts(*tfPtr.get());
272 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
273 auto tDiff = tNow - tLastTF;
274 if (mSentTFCounter && tDiff < mInput.
delay_us) {
275 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)(mInput.
delay_us - tDiff)));
277 for (
auto& msgIt : *tfPtr.get()) {
278 size_t szPart = acknowledgeOutput(*msgIt.second.get(),
false);
280 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
281 nparts += msgIt.second->Size() / 2;
282 device->Send(*msgIt.second.get(), msgIt.first);
288 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
289 LOGP(info,
"Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter,
dataSize, nparts, mSentTFCounter ?
double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
293 while (mTFQueue.
size() == 0 && mWaitSendingLast) {
304 if (mSentTFCounter >= mInput.
maxTFs || (!mTFQueue.
size() && !mRunning)) {
313 mFileFetcher->stop();
314 mFileFetcher.reset();
316 if (mTFBuilderThread.joinable()) {
317 mTFBuilderThread.join();
324 static bool stopDone =
false;
329 LOGP(info,
"{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
332 mFileFetcher->stop();
333 mFileFetcher.reset();
335 if (mTFBuilderThread.joinable()) {
336 mTFBuilderThread.join();
341 exitHdr.
state = o2f::InputChannelState::Completed;
344 dh.
runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
348 auto fmqFactory = device->GetChannel(mInput.
rawChannelConfig, 0).Transport();
349 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
350 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
351 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
352 fair::mq::Parts eosMsg;
353 eosMsg.AddPart(std::move(hdEOSMessage));
354 eosMsg.AddPart(std::move(plEOSMessage));
364void TFReaderSpec::TFBuilder()
367 std::string tfFileName;
369 bool waitAcknowledged =
false;
371 while (mRunning && mDevice) {
372 LOGP(
debug,
"mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.
size(), mWaitSendingLast);
374 mWaitSendingLast =
false;
378 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() :
"";
380 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
381 mTFBuilderCounter >= mInput.
maxTFs ||
382 (!mInput.
tfIDs.empty() && mSelIDEntry >= mInput.
tfIDs.size())) {
384 LOG(info) <<
"TFReader stops processing";
386 mFileFetcher->stop();
389 mWaitSendingLast =
false;
392 if (tfFileName.empty()) {
393 if (!waitAcknowledged) {
394 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
395 waitAcknowledged =
true;
397 std::this_thread::sleep_for(10ms);
400 mWaitSendingLast =
false;
401 if (waitAcknowledged) {
402 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
403 mTotalWaitTime += waitTime;
405 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
407 waitAcknowledged =
false;
411 LOG(info) <<
"Processing file " << tfFileName;
416 while (mRunning && mTFBuilderCounter < mInput.
maxTFs) {
418 if (mTFQueue.
size() > 1) {
419 mWaitSendingLast =
false;
425 bool acceptTF =
true;
427 if (mRunTimeRanges.size()) {
428 const auto* dataptr = (*
tf->begin()->second.get())[0].GetData();
429 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
430 static int runNumberPrev = -1;
431 if (runNumberPrev != hd0->runNumber) {
432 runNumberPrev = hd0->runNumber;
433 runTimeRangesToIRFrameSelector(runNumberPrev);
435 if (mIRFrameSelector.
isSet()) {
440 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
441 irSpan.size(),
ir0.
asString(), ir1.asString(), acceptTF ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
445 if (!mInput.
tfIDs.empty() && acceptTF) {
447 while ((mInput.
tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.
tfIDs.size()) {
450 LOGP(info,
"chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.
tfIDs[mSelIDEntry], mTFBuilderCounter);
451 if (mInput.
tfIDs[mSelIDEntry] == mTFBuilderCounter) {
452 mWaitSendingLast =
false;
454 LOGP(info,
"Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
456 LOGP(info,
"Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
461 if (mRunning &&
tf) {
464 mWaitSendingLast =
true;
465 mTFQueue.
push(std::move(
tf));
476 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.
maxLoops);
483void TFReaderSpec::loadRunTimeSpans(
const std::string& flname)
485 std::ifstream inputFile(flname);
487 LOGP(fatal,
"Failed to open selected run/timespans file {}", flname);
490 size_t cntl = 0, cntr = 0;
491 while (std::getline(inputFile, line)) {
493 for (
char& ch :
line) {
494 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
499 if (
line.size() < 1 || line[0] ==
'#') {
503 auto logError = [&cntl, &
line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
504 if (tokens.size() >= 3) {
508 run = std::stoi(tokens[0]);
509 rmin = std::stol(tokens[1]);
510 rmax = std::stol(tokens[2]);
516 constexpr long ISTimeStamp = 1514761200000L;
517 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
519 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
521 if (mConvRunTimeRangesToOrbits == -1) {
522 if (convmn != convmx) {
523 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
525 mConvRunTimeRangesToOrbits = convmn;
526 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
528 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
529 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits",
line);
533 mRunTimeRanges[
run].emplace_back(rmin, rmax);
539 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
544void TFReaderSpec::runTimeRangesToIRFrameSelector(
int runNumber)
547 mIRFrameSelector.
clear();
548 auto ent = mRunTimeRanges.find(runNumber);
549 if (ent == mRunTimeRanges.end()) {
550 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
557 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", runNumber);
560 std::vector<o2::dataformats::IRFrame> frames;
561 for (
const auto& rng : ent->second) {
562 long orbMin = 0, orbMax = 0;
563 if (mConvRunTimeRangesToOrbits > 0) {
576 if (runNumber > 523897) {
580 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
591 spec.
name =
"tf-reader";
592 const DetID::mask_t DEFMask =
DetID::getMask(
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
656 if (!rinp.metricChannel.empty()) {
657 spec.options.emplace_back(
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.metricChannel, {
"Out-of-band channel config for TF throttling"}});
660 auto nameStart = rinp.rawChannelConfig.find(
"name=");
661 if (nameStart == std::string::npos) {
662 throw std::runtime_error(
"raw channel name is not provided");
664 nameStart += strlen(
"name=");
665 auto nameEnd = rinp.rawChannelConfig.find(
",", nameStart + 1);
666 if (nameEnd == std::string::npos) {
667 nameEnd = rinp.rawChannelConfig.size();
669 spec.options = {
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {
"Out-of-band channel config"}}};
670 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
671 if (!rinp.metricChannel.empty()) {
672 LOGP(alarm,
"Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
673 LOGP(alarm, R
"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
676 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String,
"", {
"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
677 spec.options.emplace_back(
o2f::ConfigParamSpec{
"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {
"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
678 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf", o2f::VariantType::Int, -1, {
"max TF ID to process (<= 0 : infinite)"}});
679 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf-per-file", o2f::VariantType::Int, -1, {
"max TFs to process per raw-tf file (<= 0 : infinite)"}});
680 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-tf", o2f::VariantType::Int, 3, {
"max TFs to cache in memory"}});
681 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-files", o2f::VariantType::Int, 3, {
"max TF files queued (copied for remote source)"}});
683 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
void push(Args &&... args)
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionInfo
constexpr o2::header::DataOrigin gDataOriginAny
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
int32_t const char int32_t line
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
bool invertIRFramesSelection
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)