28#include <TStopwatch.h>
29#include <fairmq/Device.h>
30#include <fairmq/Parts.h>
44#include <unordered_map>
54using namespace std::chrono_literals;
67 using TFMap = std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>>;
75 void loadRunTimeSpans(
const std::string& flname);
76 void runTimeRangesToIRFrameSelector(
int runNumber);
82 std::vector<o2f::OutputRoute> mOutputRoutes;
83 std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
86 std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
87 std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
89 int mConvRunTimeRangesToOrbits = -1;
91 int mTFBuilderCounter = 0;
94 long mTotalWaitTime = 0;
95 size_t mSelIDEntry = 0;
96 bool mRunning =
false;
97 bool mWaitSendingLast =
false;
99 std::thread mTFBuilderThread{};
105 for (
const auto& hd : rinp.
hdVec) {
113 mInput.
tfIDs = o2::RangeTokenizer::tokenize<int>(ic.
options().
get<std::string>(
"select-tf-ids"));
125 mFileFetcher->setMaxLoops(mInput.
maxLoops);
126 mFileFetcher->setFailThreshold(ic.
options().
get<
float>(
"fetch-failure-threshold"));
127 mFileFetcher->start();
138 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder,
this);
140 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
143 if (device != mDevice) {
144 throw std::runtime_error(fmt::format(
"FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
147 mInput.
tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
149 auto acknowledgeOutput = [
this](fair::mq::Parts& parts,
bool verbose =
false) {
150 int np = parts.Size();
151 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
153 for (
int ip = 0; ip < np; ip += 2) {
154 const auto& msgh = parts[ip];
155 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
156 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
158 LOGP(info,
"Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
160 if (hd->splitPayloadIndex == 0) {
161 auto&
entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
162 if (
entry.count != this->mTFCounter) {
163 if (verbose && hdPrev) {
168 entry.count = this->mTFCounter;
169 LOG(
debug) <<
"Found a part " << ip <<
" of " << np <<
" | " << hd->dataOrigin.as<std::string>() <<
"/" << hd->dataDescription.as<std::string>()
170 <<
"/" << hd->subSpecification <<
" part " << hd->splitPayloadIndex <<
" of " << hd->splitPayloadParts <<
" for TF " << this->mTFCounter;
175 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
179 if (verbose && hdPrev) {
190 for (
auto& oroute : outputRoutes) {
191 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
192 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
193 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
194 return std::string{oroute.channel};
199 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
200 for (
auto& oroute : outputRoutes) {
201 LOGP(info,
"Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
203 return std::string{};
205 auto setTimingInfo = [&ctx](
TFMap& msgMap) {
207 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
208 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
209 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
210 timingInfo.firstTForbit = hd0->firstTForbit;
211 timingInfo.creation = dph->creation;
212 timingInfo.tfCounter = hd0->tfCounter;
213 timingInfo.runNumber = hd0->runNumber;
216 auto addMissingParts = [
this, &findOutputChannel](
TFMap& msgMap) {
218 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
219 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
220 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
221 for (
auto& out : this->mSeenOutputMap) {
222 if (out.second.count == this->mTFCounter) {
225 LOG(
debug) <<
"Adding dummy output for " << out.first.dataOrigin.as<std::string>() <<
"/" << out.first.dataDescription.as<std::string>()
226 <<
"/" << out.second.defSubSpec <<
" for TF " << this->mTFCounter;
227 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
232 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
233 if (fmqChannel.empty()) {
236 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
238 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
239 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
240 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
241 fair::mq::Parts* parts = msgMap[fmqChannel].get();
243 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
244 parts = msgMap[fmqChannel].get();
246 parts->AddPart(std::move(hdMessage));
247 parts->AddPart(std::move(plMessage));
252 if (mTFQueue.
size()) {
256 auto tfPtr = std::move(mTFQueue.
front());
259 LOG(error) <<
"Builder provided nullptr TF pointer";
262 setTimingInfo(*tfPtr.get());
265 for (
auto& msgIt : *tfPtr.get()) {
266 acknowledgeOutput(*msgIt.second.get(),
true);
268 addMissingParts(*tfPtr.get());
271 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
272 auto tDiff = tNow - tLastTF;
273 if (mTFCounter && tDiff < mInput.
delay_us) {
274 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)(mInput.
delay_us - tDiff)));
276 for (
auto& msgIt : *tfPtr.get()) {
277 size_t szPart = acknowledgeOutput(*msgIt.second.get(),
false);
279 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
280 nparts += msgIt.second->Size() / 2;
281 device->Send(*msgIt.second.get(), msgIt.first);
287 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
288 LOGP(info,
"Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter,
dataSize, nparts, mTFCounter ?
double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
292 while (mTFQueue.
size() == 0 && mWaitSendingLast) {
303 if (mTFCounter >= mInput.
maxTFs || (!mTFQueue.
size() && !mRunning)) {
312 mFileFetcher->stop();
313 mFileFetcher.reset();
315 if (mTFBuilderThread.joinable()) {
316 mTFBuilderThread.join();
323 static bool stopDone =
false;
328 LOGP(info,
"{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
331 mFileFetcher->stop();
332 mFileFetcher.reset();
334 if (mTFBuilderThread.joinable()) {
335 mTFBuilderThread.join();
340 exitHdr.
state = o2f::InputChannelState::Completed;
343 dh.
runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
347 auto fmqFactory = device->GetChannel(mInput.
rawChannelConfig, 0).Transport();
348 auto hdEOSMessage = fmqFactory->CreateMessage(exitStack.size(), fair::mq::Alignment{64});
349 auto plEOSMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
350 memcpy(hdEOSMessage->GetData(), exitStack.data(), exitStack.size());
351 fair::mq::Parts eosMsg;
352 eosMsg.AddPart(std::move(hdEOSMessage));
353 eosMsg.AddPart(std::move(plEOSMessage));
363void TFReaderSpec::TFBuilder()
366 std::string tfFileName;
368 bool waitAcknowledged =
false;
370 while (mRunning && mDevice) {
371 LOGP(
debug,
"mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.
size(), mWaitSendingLast);
373 mWaitSendingLast =
false;
377 tfFileName = mFileFetcher ? mFileFetcher->getNextFileInQueue() :
"";
379 (tfFileName.empty() && !mFileFetcher->isRunning()) ||
380 mTFBuilderCounter >= mInput.
maxTFs ||
381 (!mInput.
tfIDs.empty() && mSelIDEntry >= mInput.
tfIDs.size())) {
383 LOG(info) <<
"TFReader stops processing";
385 mFileFetcher->stop();
388 mWaitSendingLast =
false;
391 if (tfFileName.empty()) {
392 if (!waitAcknowledged) {
393 startWait = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
394 waitAcknowledged =
true;
396 std::this_thread::sleep_for(10ms);
399 mWaitSendingLast =
false;
400 if (waitAcknowledged) {
401 long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
402 mTotalWaitTime += waitTime;
404 LOGP(warn,
"Resuming reading after waiting for data {:.2} s (accumulated {:.2} s delay in {} waits)", 1e-6 * waitTime, 1e-6 * mTotalWaitTime, mNWaits);
406 waitAcknowledged =
false;
410 LOG(info) <<
"Processing file " << tfFileName;
415 while (mRunning && mTFBuilderCounter < mInput.
maxTFs) {
417 if (mTFQueue.
size() > 1) {
418 mWaitSendingLast =
false;
424 bool acceptTF =
true;
426 if (mRunTimeRanges.size()) {
427 const auto* dataptr = (*
tf->begin()->second.get())[0].GetData();
428 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
429 static int runNumberPrev = -1;
430 if (runNumberPrev != hd0->runNumber) {
431 runNumberPrev = hd0->runNumber;
432 runTimeRangesToIRFrameSelector(runNumberPrev);
434 if (mIRFrameSelector.
isSet()) {
439 LOGP(info,
"IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
440 irSpan.size(),
ir0.
asString(), ir1.asString(), acceptTF ?
"" :
"do not ", mInput.invertIRFramesSelection ?
"ON" :
"OFF");
444 if (!mInput.
tfIDs.empty() && acceptTF) {
446 if (mInput.
tfIDs[mSelIDEntry] == mTFBuilderCounter) {
447 mWaitSendingLast =
false;
449 LOGP(info,
"Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
452 LOGP(info,
"Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
459 if (mRunning &&
tf) {
461 mWaitSendingLast =
true;
462 mTFQueue.
push(std::move(
tf));
473 mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.
maxLoops);
480void TFReaderSpec::loadRunTimeSpans(
const std::string& flname)
482 std::ifstream inputFile(flname);
484 LOGP(fatal,
"Failed to open selected run/timespans file {}", flname);
487 size_t cntl = 0, cntr = 0;
488 while (std::getline(inputFile, line)) {
490 for (
char& ch :
line) {
491 if (ch ==
';' || ch ==
'\t' || ch ==
',') {
496 if (
line.size() < 1 || line[0] ==
'#') {
500 auto logError = [&cntl, &
line]() { LOGP(error,
"Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
501 if (tokens.size() >= 3) {
505 run = std::stoi(tokens[0]);
506 rmin = std::stol(tokens[1]);
507 rmax = std::stol(tokens[2]);
513 constexpr long ISTimeStamp = 1514761200000L;
514 int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0;
516 LOGP(fatal,
"Provided range limits are not in increasing order, entry is {}", line);
518 if (mConvRunTimeRangesToOrbits == -1) {
519 if (convmn != convmx) {
520 LOGP(fatal,
"Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
522 mConvRunTimeRangesToOrbits = convmn;
523 LOGP(info,
"Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ?
"timstamps(ms)" :
"orbits");
525 if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
526 LOGP(fatal,
"Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ?
"timestamps" :
"orbits",
line);
530 mRunTimeRanges[
run].emplace_back(rmin, rmax);
536 LOGP(info,
"Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
541void TFReaderSpec::runTimeRangesToIRFrameSelector(
int runNumber)
544 mIRFrameSelector.
clear();
545 auto ent = mRunTimeRanges.find(runNumber);
546 if (ent == mRunTimeRanges.end()) {
547 LOGP(info,
"RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
554 LOGP(fatal,
"failed to extract AggregatedRunInfo for run {}", runNumber);
557 std::vector<o2::dataformats::IRFrame> frames;
558 for (
const auto& rng : ent->second) {
559 long orbMin = 0, orbMax = 0;
560 if (mConvRunTimeRangesToOrbits > 0) {
573 if (runNumber > 523897) {
577 LOGP(info,
"TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.
invertIRFramesSelection ?
"rejected" :
"selected");
588 spec.
name =
"tf-reader";
589 const DetID::mask_t DEFMask =
DetID::getMask(
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
653 if (!rinp.metricChannel.empty()) {
654 spec.options.emplace_back(
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.metricChannel, {
"Out-of-band channel config for TF throttling"}});
657 auto nameStart = rinp.rawChannelConfig.find(
"name=");
658 if (nameStart == std::string::npos) {
659 throw std::runtime_error(
"raw channel name is not provided");
661 nameStart += strlen(
"name=");
662 auto nameEnd = rinp.rawChannelConfig.find(
",", nameStart + 1);
663 if (nameEnd == std::string::npos) {
664 nameEnd = rinp.rawChannelConfig.size();
666 spec.options = {
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {
"Out-of-band channel config"}}};
667 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
668 if (!rinp.metricChannel.empty()) {
669 LOGP(alarm,
"Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
670 LOGP(alarm, R
"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
673 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String,
"", {
"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
674 spec.options.emplace_back(
o2f::ConfigParamSpec{
"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {
"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
675 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf", o2f::VariantType::Int, -1, {
"max TF ID to process (<= 0 : infinite)"}});
676 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf-per-file", o2f::VariantType::Int, -1, {
"max TFs to process per raw-tf file (<= 0 : infinite)"}});
677 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-tf", o2f::VariantType::Int, 3, {
"max TFs to cache in memory"}});
678 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-files", o2f::VariantType::Int, 3, {
"max TF files queued (copied for remote source)"}});
680 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
Class to check if give InteractionRecord or IRFrame is selected by the external IRFrame vector.
Header to collect LHC related constants.
Helper function to tokenize sequences and ranges of integral numbers.
Class for time synchronization of RawReader instances.
void endOfStream(o2f::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2f::ProcessingContext &ctx) final
void init(o2f::InitContext &ic) final
std::unordered_map< std::string, std::unique_ptr< fair::mq::Parts > > TFMap
TFReaderSpec(const TFReaderInp &rinp)
static BasicCCDBManager & instance()
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr ID First
static constexpr ID Last
if extra detectors added, update this !!!
static constexpr o2h::DataOrigin getDataOrigin(ID id)
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
void push(Args &&... args)
void setOwnList(const std::vector< o2::dataformats::IRFrame > &lst, bool toBeSorted)
gsl::span< const o2::dataformats::IRFrame > getMatchingFrames(const o2::dataformats::IRFrame &fr)
constexpr o2::header::DataOrigin gDataOriginFLP
constexpr o2::header::DataDescription gDataDescriptionInfo
constexpr o2::header::DataOrigin gDataOriginAny
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
constexpr int LHCMaxBunches
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
int32_t const char int32_t line
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
std::unique_ptr< GPUReconstructionTimeframe > tf
std::string asString() const
static AggregatedRunInfo buildAggregatedRunInfo(int runnumber, long sorMS, long eorMS, long orbitResetMUS, const o2::parameters::GRPECSObject *grpecs, const std::vector< Long64_t > *ctfFirstRunOrbitVec)
std::vector< o2::header::DataHeader > hdVec
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
o2::detectors::DetID::mask_t detMaskNonRawOnly
std::string fileRunTimeSpans
bool invertIRFramesSelection
o2::detectors::DetID::mask_t detMask
o2::detectors::DetID::mask_t detMaskRawOnly
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir0(3, 5)