123 mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder,
this);
125 static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
128 if (device != mDevice) {
129 throw std::runtime_error(fmt::format(
"FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
132 mInput.
tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
134 auto acknowledgeOutput = [
this](fair::mq::Parts& parts,
bool verbose =
false) {
135 int np = parts.Size();
136 size_t dsize = 0, dsizeTot = 0, nblocks = 0;
138 for (
int ip = 0; ip < np; ip += 2) {
139 const auto& msgh = parts[ip];
140 const auto* hd = o2h::get<o2h::DataHeader*>(msgh.GetData());
141 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(msgh.GetData());
143 LOGP(info,
"Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
145 if (dph->startTime != this->mTFCounter) {
146 LOGP(fatal,
"Local tf counter {} != TF timeslice {} for {}", this->mTFCounter, dph->startTime,
149 if (hd->splitPayloadIndex == 0) {
150 auto&
entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
151 if (
entry.count != this->mTFCounter) {
152 if (verbose && hdPrev) {
157 entry.count = this->mTFCounter;
158 LOG(
debug) <<
"Found a part " << ip <<
" of " << np <<
" | " << hd->dataOrigin.as<std::string>() <<
"/" << hd->dataDescription.as<std::string>()
159 <<
"/" << hd->subSpecification <<
" part " << hd->splitPayloadIndex <<
" of " << hd->splitPayloadParts <<
" for TF " << this->mTFCounter;
164 dsize += msgh.GetSize() + parts[ip + 1].GetSize();
168 if (verbose && hdPrev) {
179 for (
auto& oroute : outputRoutes) {
180 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
181 if (o2f::DataSpecUtils::match(oroute.matcher,
h.dataOrigin,
h.dataDescription,
h.subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
182 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
183 return std::string{oroute.channel};
188 LOGP(error,
"Failed to find output channel for {}/{}/{} @ timeslice {}",
h.dataOrigin,
h.dataDescription,
h.subSpecification,
h.tfCounter);
189 for (
auto& oroute : outputRoutes) {
190 LOGP(info,
"Available route route {}", o2f::DataSpecUtils::describe(oroute.matcher));
192 return std::string{};
194 auto setTimingInfo = [&ctx](
TFMap& msgMap) {
196 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
197 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
198 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
199 timingInfo.firstTForbit = hd0->firstTForbit;
200 timingInfo.creation = dph->creation;
201 timingInfo.tfCounter = hd0->tfCounter;
202 timingInfo.runNumber = hd0->runNumber;
205 auto addMissingParts = [
this, &findOutputChannel](
TFMap& msgMap) {
207 const auto* dataptr = (*msgMap.begin()->second.get())[0].GetData();
208 const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
209 const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
210 for (
auto& out : this->mSeenOutputMap) {
211 if (out.second.count == this->mTFCounter) {
214 LOG(
debug) <<
"Adding dummy output for " << out.first.dataOrigin.as<std::string>() <<
"/" << out.first.dataDescription.as<std::string>()
215 <<
"/" << out.second.defSubSpec <<
" for TF " << this->mTFCounter;
216 o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
221 const auto fmqChannel = findOutputChannel(outHeader, dph->startTime);
222 if (fmqChannel.empty()) {
225 auto fmqFactory = this->mDevice->GetChannel(fmqChannel, 0).Transport();
227 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
228 auto plMessage = fmqFactory->CreateMessage(0, fair::mq::Alignment{64});
229 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
230 fair::mq::Parts* parts = msgMap[fmqChannel].get();
232 msgMap[fmqChannel] = std::make_unique<fair::mq::Parts>();
233 parts = msgMap[fmqChannel].get();
235 parts->AddPart(std::move(hdMessage));
236 parts->AddPart(std::move(plMessage));
241 if (mTFQueue.
size()) {
245 auto tfPtr = std::move(mTFQueue.
front());
248 LOG(error) <<
"Builder provided nullptr TF pointer";
251 setTimingInfo(*tfPtr.get());
254 for (
auto& msgIt : *tfPtr.get()) {
255 acknowledgeOutput(*msgIt.second.get(),
true);
257 addMissingParts(*tfPtr.get());
260 auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
261 auto tDiff = tNow - tLastTF;
262 if (mTFCounter && tDiff < mInput.
delay_us) {
263 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)(mInput.
delay_us - tDiff)));
265 for (
auto& msgIt : *tfPtr.get()) {
266 size_t szPart = acknowledgeOutput(*msgIt.second.get(),
false);
268 const auto* hd = o2h::get<o2h::DataHeader*>((*msgIt.second.get())[0].GetData());
269 nparts += msgIt.second->Size() / 2;
270 device->Send(*msgIt.second.get(), msgIt.first);
276 tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
277 LOGP(info,
"Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter,
dataSize, nparts, mTFCounter ?
double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
281 while (mTFQueue.
size() == 0 && mWaitSendingLast) {
292 if (mTFCounter >= mInput.
maxTFs || (!mTFQueue.
size() && !mRunning)) {
451 spec.
name =
"tf-reader";
452 const DetID::mask_t DEFMask =
DetID::getMask(
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP,FOC");
516 if (!rinp.metricChannel.empty()) {
517 spec.options.emplace_back(
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.metricChannel, {
"Out-of-band channel config for TF throttling"}});
520 auto nameStart = rinp.rawChannelConfig.find(
"name=");
521 if (nameStart == std::string::npos) {
522 throw std::runtime_error(
"raw channel name is not provided");
524 nameStart += strlen(
"name=");
525 auto nameEnd = rinp.rawChannelConfig.find(
",", nameStart + 1);
526 if (nameEnd == std::string::npos) {
527 nameEnd = rinp.rawChannelConfig.size();
529 spec.options = {
o2f::ConfigParamSpec{
"channel-config", o2f::VariantType::String, rinp.rawChannelConfig, {
"Out-of-band channel config"}}};
530 rinp.rawChannelConfig = rinp.rawChannelConfig.substr(nameStart, nameEnd - nameStart);
531 if (!rinp.metricChannel.empty()) {
532 LOGP(alarm,
"Cannot apply TF rate limiting when publishing to raw channel, limiting must be applied on the level of the input raw proxy");
533 LOGP(alarm, R
"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
536 spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String,
"", {
"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
537 spec.options.emplace_back(
o2f::ConfigParamSpec{
"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {
"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
538 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf", o2f::VariantType::Int, -1, {
"max TF ID to process (<= 0 : infinite)"}});
539 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-tf-per-file", o2f::VariantType::Int, -1, {
"max TFs to process per raw-tf file (<= 0 : infinite)"}});
540 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-tf", o2f::VariantType::Int, 3, {
"max TFs to cache in memory"}});
541 spec.options.emplace_back(
o2f::ConfigParamSpec{
"max-cached-files", o2f::VariantType::Int, 3, {
"max TF files queued (copied for remote source)"}});
543 spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);