Project
Loading...
Searching...
No Matches
DataProcessor.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
22#include "Headers/DataHeader.h"
24
25#include <Monitoring/Monitoring.h>
26#include <fairmq/Parts.h>
27#include <fairmq/Device.h>
28#include <arrow/io/memory.h>
29#include <arrow/ipc/writer.h>
30#include <cstddef>
31
32using namespace o2::framework;
34
35namespace o2::framework
36{
37
39{
40 auto& proxy = services.get<FairMQDeviceProxy>();
41 std::vector<fair::mq::Parts> outputsPerChannel;
42 outputsPerChannel.resize(proxy.getNumOutputChannels());
43 auto contextMessages = context.getMessagesForSending();
44 for (auto& message : contextMessages) {
45 // monitoringService.send({ message->parts.Size(), "outputs/total" });
46 fair::mq::Parts parts = message->finalize();
47 assert(message->empty());
48 assert(parts.Size() == 2);
49 for (auto& part : parts) {
50 outputsPerChannel[proxy.getOutputChannelIndex((message->route())).value].AddPart(std::move(part));
51 }
52 }
53 for (int ci = 0; ci < outputsPerChannel.size(); ++ci) {
54 auto& parts = outputsPerChannel[ci];
55 if (parts.Size() == 0) {
56 continue;
57 }
58 sender.send(parts, {ci});
59 }
60}
61
63{
64 auto& proxy = services.get<FairMQDeviceProxy>();
65 for (auto& messageRef : context) {
66 fair::mq::Parts parts;
67 fair::mq::MessagePtr payload(sender.create(messageRef.routeIndex));
68 auto a = messageRef.payload.get();
69 // Rebuild the message using the string as input. For now it involves a copy.
70 payload->Rebuild(reinterpret_cast<void*>(const_cast<char*>(strdup(a->data()))), a->size(), nullptr, nullptr);
71 const DataHeader* cdh = o2::header::get<DataHeader*>(messageRef.header->GetData());
72 // sigh... See if we can avoid having it const by not
73 // exposing it to the user in the first place.
74 auto* dh = const_cast<DataHeader*>(cdh);
75 dh->payloadSize = payload->GetSize();
76 parts.AddPart(std::move(messageRef.header));
77 parts.AddPart(std::move(payload));
78 sender.send(parts, proxy.getOutputChannelIndex(messageRef.routeIndex));
79 }
80}
81
83{
84 using o2::monitoring::Metric;
85 using o2::monitoring::Monitoring;
86 using o2::monitoring::tags::Key;
87 using o2::monitoring::tags::Value;
88 auto& monitoring = registry.get<Monitoring>();
89 auto& stats = registry.get<DataProcessingStats>();
90
91 static const std::regex invalid_metric(" ");
92 auto& proxy = registry.get<FairMQDeviceProxy>();
93 for (auto& messageRef : context) {
94 fair::mq::Parts parts;
95 // Depending on how the arrow table is constructed, we finalize
96 // the writing here.
97 messageRef.finalize(messageRef.buffer);
98
99 std::unique_ptr<fair::mq::Message> payload = messageRef.buffer->Finalise();
100 // FIXME: for the moment we simply send empty bodies.
101 const DataHeader* cdh = o2::header::get<DataHeader*>(messageRef.header->GetData());
102 // sigh... See if we can avoid having it const by not
103 // exposing it to the user in the first place.
104 auto* dh = const_cast<DataHeader*>(cdh);
105 dh->payloadSize = payload->GetSize();
106 dh->serialization = o2::header::gSerializationMethodArrow;
107
108 auto origin = std::regex_replace(dh->dataOrigin.as<std::string>(), invalid_metric, "_");
109 auto description = std::regex_replace(dh->dataDescription.as<std::string>(), invalid_metric, "_");
110 uint64_t version = dh->subSpecification;
111 monitoring.send(Metric{(uint64_t)payload->GetSize(),
112 fmt::format("table-bytes-{}-{}-{}-created",
113 origin,
114 description,
115 version)}
116 .addTag(Key::Subsystem, Value::DPL));
117 LOGP(detail, "Creating {}MB for table {}/{}/{}.", payload->GetSize() / 1000000., dh->dataOrigin, dh->dataDescription, version);
118 context.updateBytesSent(payload->GetSize());
119 context.updateMessagesSent(1);
120 parts.AddPart(std::move(messageRef.header));
121 parts.AddPart(std::move(payload));
122 sender.send(parts, proxy.getOutputChannelIndex(messageRef.routeIndex));
123 }
124 static int64_t previousBytesSent = 0;
125 auto disposeResources = [bs = context.bytesSent() - previousBytesSent](int taskId,
126 std::array<ComputingQuotaOffer, 16>& offers,
127 ComputingQuotaStats& stats,
128 std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> accountDisposed) {
129 ComputingQuotaOffer disposed;
130 disposed.sharedMemory = 0;
131 int64_t bytesSent = bs;
132 for (auto& offer : offers) {
133 if (offer.user != taskId) {
134 continue;
135 }
136 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
137 offer.sharedMemory -= toRemove;
138 bytesSent -= toRemove;
139 disposed.sharedMemory += toRemove;
140 if (bytesSent <= 0) {
141 break;
142 }
143 }
144 return accountDisposed(disposed, stats);
145 };
146 registry.get<DeviceState>().offerConsumers.emplace_back(disposeResources);
147 previousBytesSent = context.bytesSent();
148 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, static_cast<int64_t>(context.bytesSent())});
149 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, static_cast<int64_t>(context.messagesCreated())});
150 stats.processCommandQueue();
151}
152
153} // namespace o2::framework
o2::monitoring::Metric Metric
uint32_t version
Definition RawData.h:8
o2::monitoring::Monitoring Monitoring
void updateMessagesSent(size_t value)
void updateBytesSent(size_t value)
Allow injecting policies on send.
Definition DataSender.h:34
std::unique_ptr< fair::mq::Message > create(RouteIndex index)
void send(fair::mq::Parts &, ChannelIndex index)
GLsizei const GLfloat * value
Definition glcorearb.h:819
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
Helper struct to hold statistics about the data processing happening.
static void doSend(DataSender &, MessageContext &, ServiceRegistryRef)
Running state information of a given device.
Definition DeviceState.h:34
the main header struct
Definition DataHeader.h:618
PayloadSizeType payloadSize
Definition DataHeader.h:666