58 static std::unordered_map<DPID, DPCOM> cache;
59 static std::unordered_map<std::string, int> sentToChannel;
60 static auto timer = std::chrono::high_resolution_clock::now();
61 static auto timer0 = std::chrono::high_resolution_clock::now();
62 static bool seenFBI =
false;
63 static uint32_t localTFCounter = 0;
64 static size_t nInp = 0, nInpFBI = 0;
65 static size_t szInp = 0, szInpFBI = 0;
67 LOG(info) <<
"In lambda function: ********* Size of unordered_map (--> number of defined groups) = " << dpid2group.size();
71 LOGP(warn,
"Empty input recieved at timeslice {}", tinfo.timeslice);
74 std::string firstName = std::string((
char*)&(
reinterpret_cast<const DPCOM*
>(parts.At(0)->GetData()))->id);
85 LOGP(error,
"Cannot determine if the map is FBI or Delta, 1st DP name is {}", firstName);
88 LOGP(info,
"New input of {} parts received, map type: {}, timeslice {}", parts.Size(), isFBI ?
"FBI" :
"Delta", tinfo.timeslice);
92 for (
size_t i = 0;
i < parts.Size(); ++
i) {
93 auto sz = parts.At(
i)->GetSize();
98 auto nDPCOM = sz /
sizeof(
DPCOM);
100 for (
size_t j = 0;
j < nDPCOM;
j++) {
101 const auto*
ptr = (
reinterpret_cast<const DPCOM*
>(parts.At(
i)->GetData()) +
j);
105 auto mapEl = dpid2group.find(
src.id);
108 if (mapEl == dpid2group.end()) {
111 for (
const auto&
ds : mapEl->second) {
112 dest += fmt::format(
"{}, ",
ds.as<std::string>());
115 LOG(info) <<
"Received DP " <<
src.id <<
" (data = " <<
src.data <<
"), matched to output-> " << dest;
117 if (mapEl != dpid2group.end()) {
122 auto timerNow = std::chrono::high_resolution_clock::now();
123 if (fbiFirst && nInpFBI < 2) {
125 static int prevDelay = 0;
126 std::chrono::duration<double, std::ratio<1>> duration = timerNow - timer0;
127 int delay = duration.count();
128 if (delay > prevDelay) {
129 LOGP(info,
"Waiting for requested 1st FBI since {} s", delay);
134 std::chrono::duration<double, std::ratio<1>> duration = timerNow - timer;
135 bool didSendMessages =
false;
136 if (duration.count() > 1 && (seenFBI || !fbiFirst)) {
137 std::unordered_map<o2h::DataDescription, pmr::vector<DPCOM>, std::hash<o2h::DataDescription>> outputs;
140 for (
auto& it : cache) {
141 auto mapEl = dpid2group.find(it.first);
142 if (mapEl != dpid2group.end()) {
143 for (
const auto&
ds : mapEl->second) {
144 outputs[
ds].push_back(it.second);
148 std::uint64_t creation = std::chrono::time_point_cast<std::chrono::milliseconds>(timerNow).time_since_epoch().count();
149 std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>> messagesPerRoute;
151 for (
auto& it : outputs) {
154 if (it.second.empty()) {
155 LOG(warning) <<
"No data for OutputSpec " << outsp;
158 auto channel = channelRetriever(outsp, tinfo.timeslice);
159 if (channel.empty()) {
160 LOG(warning) <<
"No output channel found for OutputSpec " << outsp <<
", discarding its data";
172 auto fmqFactory = device->GetChannel(channel).Transport();
173 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
174 auto plMessage = fmqFactory->CreateMessage(hdr.
payloadSize, fair::mq::Alignment{64});
175 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
176 memcpy(plMessage->GetData(), it.second.data(), hdr.payloadSize);
178 fair::mq::Parts* parts2send = messagesPerRoute[channel].get();
180 messagesPerRoute[channel] = std::make_unique<fair::mq::Parts>();
181 parts2send = messagesPerRoute[channel].get();
183 parts2send->AddPart(std::move(hdMessage));
184 parts2send->AddPart(std::move(plMessage));
186 LOGP(info,
"Pushing {} DPs to {} for TimeSlice {} at {}", it.second.size(), o2f::DataSpecUtils::describe(outsp), tinfo.timeslice, creation);
191 for (
auto& msgIt : messagesPerRoute) {
193 LOG(info) <<
"Sending " << msgIt.second->Size() / 2 <<
" parts to channel " << msgIt.first;
196 sentToChannel[msgIt.first]++;
197 didSendMessages |= msgIt.second->Size() > 0;
201 if (!messagesPerRoute.empty()) {
205 if (isFBI && ((nInpFBI % FBIPerInterval) == 0 || verbose)) {
206 float runtime = 1e-3 * std::chrono::duration_cast<std::chrono::milliseconds>(timerNow - timer0).count();
207 std::string sent =
"Sent since last FBI report: ";
208 for (
auto&
m : sentToChannel) {
209 auto pos =
m.first.find(
"_to_");
210 sent += fmt::format(
"{}:{} ",
m.first.substr(
pos != std::string::npos ?
pos + 4 : 0),
m.second);
213 LOGP(info,
"{} inputs ({} bytes) of which {} FBI ({} bytes) seen in {:.3f} s | {}", nInp, fmt::group_digits(szInp), nInpFBI, fmt::group_digits(szInpFBI), runtime, sent);
215 return didSendMessages;