41 std::vector<fair::mq::Parts> outputsPerChannel;
42 outputsPerChannel.resize(proxy.getNumOutputChannels());
44 for (
auto&
message : contextMessages) {
46 fair::mq::Parts parts =
message->finalize();
48 assert(parts.Size() == 2);
49 for (
auto& part : parts) {
50 outputsPerChannel[proxy.getOutputChannelIndex((
message->route())).
value].AddPart(std::move(part));
53 for (
int ci = 0; ci < outputsPerChannel.size(); ++ci) {
54 auto& parts = outputsPerChannel[ci];
55 if (parts.Size() == 0) {
58 sender.
send(parts, {ci});
65 for (
auto& messageRef : context) {
66 fair::mq::Parts parts;
67 fair::mq::MessagePtr payload(sender.
create(messageRef.routeIndex));
68 auto a = messageRef.payload.get();
70 payload->Rebuild(
reinterpret_cast<void*
>(
const_cast<char*
>(strdup(
a->data()))),
a->size(),
nullptr,
nullptr);
71 const DataHeader* cdh = o2::header::get<DataHeader*>(messageRef.header->GetData());
76 parts.AddPart(std::move(messageRef.header));
77 parts.AddPart(std::move(payload));
78 sender.
send(parts, proxy.getOutputChannelIndex(messageRef.routeIndex));
84 using o2::monitoring::Metric;
85 using o2::monitoring::Monitoring;
86 using o2::monitoring::tags::Key;
87 using o2::monitoring::tags::Value;
91 static const std::regex invalid_metric(
" ");
93 for (
auto& messageRef : context) {
94 fair::mq::Parts parts;
97 messageRef.finalize(messageRef.buffer);
99 std::unique_ptr<fair::mq::Message> payload = messageRef.buffer->Finalise();
101 const DataHeader* cdh = o2::header::get<DataHeader*>(messageRef.header->GetData());
108 auto origin = std::regex_replace(dh->dataOrigin.as<std::string>(), invalid_metric,
"_");
109 auto description = std::regex_replace(dh->dataDescription.as<std::string>(), invalid_metric,
"_");
110 uint64_t
version = dh->subSpecification;
111 monitoring.send(
Metric{(uint64_t)payload->GetSize(),
112 fmt::format(
"table-bytes-{}-{}-{}-created",
116 .addTag(Key::Subsystem, Value::DPL));
117 LOGP(detail,
"Creating {}MB for table {}/{}/{}.", payload->GetSize() / 1000000., dh->dataOrigin, dh->dataDescription,
version);
120 parts.AddPart(std::move(messageRef.header));
121 parts.AddPart(std::move(payload));
122 sender.
send(parts, proxy.getOutputChannelIndex(messageRef.routeIndex));
124 static int64_t previousBytesSent = 0;
125 auto disposeResources = [bs = context.
bytesSent() - previousBytesSent](
int taskId,
126 std::array<ComputingQuotaOffer, 16>& offers,
131 int64_t bytesSent = bs;
132 for (
auto& offer : offers) {
133 if (offer.user != taskId) {
136 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
137 offer.sharedMemory -= toRemove;
138 bytesSent -= toRemove;
140 if (bytesSent <= 0) {
144 return accountDisposed(disposed, stats);
146 registry.
get<
DeviceState>().offerConsumers.emplace_back(disposeResources);
150 stats.processCommandQueue();
void updateMessagesSent(size_t value)
void updateBytesSent(size_t value)
Messages getMessagesForSending()