Project
Loading...
Searching...
No Matches
DataAllocator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/Lifetime.h"
24#include "Framework/Signpost.h"
25#include "Headers/DataHeader.h"
27#include "Headers/Stack.h"
28
29#include <fairmq/Device.h>
30
31#include <arrow/ipc/writer.h>
32#include <arrow/type.h>
33#include <arrow/io/memory.h>
34#include <arrow/util/config.h>
35
36#include <TClonesArray.h>
37
38#include <utility>
39
40O2_DECLARE_DYNAMIC_LOG(stream_context);
42
43namespace o2::framework
44{
45
49
51 : mRegistry{contextRegistry}
52{
53}
54
55RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice)
56{
57 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
58 auto& stream = mRegistry.get<o2::framework::StreamContext>();
59 // FIXME: we should take timeframeId into account as well.
60 for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
61 auto& route = allowedOutputRoutes[ri];
62 if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
63 stream.routeCreated.at(ri) = true;
64 auto sid = _o2_signpost_id_t{(int64_t)&stream};
65 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "data_allocator", "Route %" PRIu64 " (%{public}s) created for timeslice %" PRIu64,
66 (uint64_t)ri, DataSpecUtils::describe(route.matcher).c_str(), (uint64_t)timeslice);
67 return RouteIndex{ri};
68 }
69 }
70 throw runtime_error_f(
71 "Worker is not authorised to create message with "
72 "origin(%s) description(%s) subSpec(%d)",
73 spec.origin.as<std::string>().c_str(),
74 spec.description.as<std::string>().c_str(),
75 spec.subSpec);
76}
77
79{
80 auto& timingInfo = mRegistry.get<TimingInfo>();
81 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
82 auto& context = mRegistry.get<MessageContext>();
83
84 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
86 size //
87 );
88 auto& co = context.add<MessageContext::ContainerRefObject<DataChunk>>(std::move(headerMessage), routeIndex, 0, size);
89 return co;
90}
91
92void DataAllocator::adoptChunk(const Output& spec, char* buffer, size_t size, fair::mq::FreeFn* freefn, void* hint = nullptr)
93{
94 // Find a matching channel, create a new message for it and put it in the
95 // queue to be sent at the end of the processing
96 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
97
98 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
100 size //
101 );
102
103 // FIXME: how do we want to use subchannels? time based parallelism?
104 auto& context = mRegistry.get<MessageContext>();
105 context.add<MessageContext::TrivialObject>(std::move(headerMessage), routeIndex, 0, buffer, size, freefn, hint);
106}
107
108fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, //
109 RouteIndex routeIndex, //
111 size_t payloadSize) //
112{
113 auto& timingInfo = mRegistry.get<TimingInfo>();
114 DataHeader dh;
115 dh.dataOrigin = spec.origin;
116 dh.dataDescription = spec.description;
117 dh.subSpecification = spec.subSpec;
118 dh.payloadSize = payloadSize;
119 dh.payloadSerializationMethod = method;
120 dh.tfCounter = timingInfo.tfCounter;
121 dh.firstTForbit = timingInfo.firstTForbit;
122 dh.runNumber = timingInfo.runNumber;
123
124 DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
125 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
126 auto lifetime = proxy.getOutputRoute(routeIndex).matcher.lifetime;
127 static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
128 // Messages associated to sporatic output we always keep, since they are most likely histograms / condition
129 // objects which need to be kept at the end of stream.
130 static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= (lifetime == Lifetime::Sporadic) ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
131 auto* transport = proxy.getOutputTransport(routeIndex);
132
133 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
134 return o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, spec.metaHeader});
135}
136
137void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payloadMessage, const Output& spec,
138 o2::header::SerializationMethod serializationMethod)
139{
140 auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0);
141 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerMessage->GetData());
142 // FIXME: this is kind of ugly, we know that we can change the content of the
143 // header message because we have just created it, but the API declares it const
144 const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
145 auto* dh = const_cast<DataHeader*>(cdh);
146 dh->payloadSize = payloadMessage->GetSize();
147 O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
148 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", headerMessage->GetData(), dh->payloadSize);
149 auto& context = mRegistry.get<MessageContext>();
150 // make_scoped creates the context object inside of a scope handler, since it goes out of
151 // scope immediately, the created object is scheduled and can be directly sent if the context
152 // is configured with the dispatcher callback
153 context.make_scoped<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
154}
155
156void DataAllocator::adopt(const Output& spec, std::string* ptr)
157{
158 std::unique_ptr<std::string> payload(ptr);
159 auto& timingInfo = mRegistry.get<TimingInfo>();
160 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
161 // the correct payload size is set later when sending the
162 // StringContext, see DataProcessor::doSend
163 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
164 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
165 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
166 O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
167 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
168 mRegistry.get<StringContext>().addString(std::move(header), std::move(payload), routeIndex);
169 assert(payload.get() == nullptr);
170}
171
172void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
173{
174 auto mock = std::make_shared<arrow::io::MockOutputStream>();
175 int64_t expectedSize = 0;
176 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), table->schema());
177 arrow::Status outStatus;
178 if (O2_BUILTIN_LIKELY(table->num_rows() != 0)) {
179 outStatus = mockWriter.ValueOrDie()->WriteTable(*table);
180 } else {
181 std::vector<std::shared_ptr<arrow::Array>> columns;
182 columns.resize(table->columns().size());
183 for (size_t ci = 0; ci < table->columns().size(); ci++) {
184 columns[ci] = table->column(ci)->chunk(0);
185 }
186 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
187 outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
188 }
189
190 expectedSize = mock->Tell().ValueOrDie();
191 auto reserve = b->Reserve(expectedSize);
192 if (reserve.ok() == false) {
193 throw std::runtime_error("Unable to reserve memory for table");
194 }
195
196 auto stream = std::make_shared<FairMQOutputStream>(b);
197 auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), table->schema());
198 if (outBatch.ok() == false) {
199 throw ::std::runtime_error("Unable to create batch writer");
200 }
201
202 if (O2_BUILTIN_UNLIKELY(table->num_rows() == 0)) {
203 std::vector<std::shared_ptr<arrow::Array>> columns;
204 columns.resize(table->columns().size());
205 for (size_t ci = 0; ci < table->columns().size(); ci++) {
206 columns[ci] = table->column(ci)->chunk(0);
207 }
208 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
209 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
210 } else {
211 outStatus = outBatch.ValueOrDie()->WriteTable(*table);
212 }
213
214 if (outStatus.ok() == false) {
215 throw std::runtime_error("Unable to Write table");
216 }
217}
218
220{
221 auto& timingInfo = mRegistry.get<TimingInfo>();
222 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
223 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
224 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
225 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
226 O2_SIGNPOST_START(parts, pid, "parts", "adopt %{public}s@%p %" PRIu64,
227 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
228 auto& context = mRegistry.get<ArrowContext>();
229
230 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
231 return transport->CreateMessage(s);
232 };
233 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
234
235 tb.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](TableBuilder& builder) -> void {
236 auto table = builder.finalize();
237 doWriteTable(buffer, table.get());
238 // deletion happens in the caller
239 };
240
242 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
243 // Finalization not needed, as we do it using the LifetimeHolder callback
244 };
245
246 context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
247}
248
250{
251 auto& timingInfo = mRegistry.get<TimingInfo>();
252 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
253
254 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
255 auto& context = mRegistry.get<ArrowContext>();
256
257 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
258 return transport->CreateMessage(s);
259 };
260 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
261
262 f2b.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](FragmentToBatch& source) {
263 // Serialization happens in here, so that we can
264 // get rid of the intermediate tree 2 table object, saving memory.
265 auto batch = source.finalize();
266 auto mock = std::make_shared<arrow::io::MockOutputStream>();
267 int64_t expectedSize = 0;
268 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
269 arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
270
271 expectedSize = mock->Tell().ValueOrDie();
272 auto reserve = buffer->Reserve(expectedSize);
273 if (reserve.ok() == false) {
274 throw std::runtime_error("Unable to reserve memory for table");
275 }
276
277 auto deferredWriterStream = source.streamer(buffer);
278
279 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema());
280 if (outBatch.ok() == false) {
281 throw ::std::runtime_error("Unable to create batch writer");
282 }
283
284 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
285
286 if (outStatus.ok() == false) {
287 throw std::runtime_error("Unable to Write batch");
288 }
289 // deletion happens in the caller
290 };
291
292 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
293 // This is empty because we already serialised the object when
294 // the LifetimeHolder goes out of scope. See code above.
295 };
296
297 context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
298}
299
300void DataAllocator::adopt(const Output& spec, std::shared_ptr<arrow::Table> ptr)
301{
302 auto& timingInfo = mRegistry.get<TimingInfo>();
303 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
304 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
305 auto& context = mRegistry.get<ArrowContext>();
306
307 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
308 return transport->CreateMessage(s);
309 };
310 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
311
312 auto writer = [table = ptr](std::shared_ptr<FairMQResizableBuffer> b) -> void {
313 doWriteTable(b, table.get());
314 };
315
316 context.addBuffer(std::move(header), buffer, std::move(writer), routeIndex);
317}
318
319void DataAllocator::snapshot(const Output& spec, const char* payload, size_t payloadSize,
320 o2::header::SerializationMethod serializationMethod)
321{
322 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
323 auto& timingInfo = mRegistry.get<TimingInfo>();
324
325 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
326 fair::mq::MessagePtr payloadMessage(proxy.createOutputMessage(routeIndex, payloadSize));
327 memcpy(payloadMessage->GetData(), payload, payloadSize);
328
329 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationMethod);
330}
331
332Output DataAllocator::getOutputByBind(OutputRef&& ref)
333{
334 if (ref.label.empty()) {
335 throw runtime_error("Invalid (empty) OutputRef provided.");
336 }
337 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
338 for (auto ri = 0ul, re = allowedOutputRoutes.size(); ri != re; ++ri) {
339 if (allowedOutputRoutes[ri].matcher.binding.value == ref.label) {
340 auto spec = allowedOutputRoutes[ri].matcher;
341 auto dataType = DataSpecUtils::asConcreteDataTypeMatcher(spec);
342 return Output{dataType.origin, dataType.description, ref.subSpec, std::move(ref.headerStack)};
343 }
344 }
345 std::string availableRoutes;
346 for (auto const& route : allowedOutputRoutes) {
347 availableRoutes += "\n - " + route.matcher.binding.value + ": " + DataSpecUtils::describe(route.matcher);
348 }
349 throw runtime_error_f("Unable to find OutputSpec with label %s. Available Routes: %s", ref.label.c_str(), availableRoutes.c_str());
351}
352
354{
355 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
356 for (auto const& route : allowedOutputRoutes) {
357 if (DataSpecUtils::match(route.matcher, query.origin, query.description, query.subSpec)) {
358 return true;
359 }
360 }
361 return false;
362}
363
365{
366 // Find a matching channel, extract the message for it form the container
367 // and put it in the queue to be sent at the end of the processing
368 auto& timingInfo = mRegistry.get<TimingInfo>();
369 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
370
371 auto& context = mRegistry.get<MessageContext>();
372 fair::mq::MessagePtr payloadMessage = context.cloneFromCache(id.value);
373
374 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
375 method, //
376 payloadMessage->GetSize() //
377 );
378
379 context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
380}
381
383{
384 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
385 auto& timingInfo = mRegistry.get<TimingInfo>();
386
387 // We get the output route from the original spec, but we send it
388 // using the binding of the deadbeef subSpecification.
389 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
390 auto deadBeefOutput = Output{spec.origin, spec.description, 0xdeadbeef};
391 auto headerMessage = headerMessageFromOutput(deadBeefOutput, routeIndex, header::gSerializationMethodNone, 0);
392
393 addPartToContext(routeIndex, proxy.createOutputMessage(routeIndex, 0), deadBeefOutput, header::gSerializationMethodNone);
394}
395
396} // namespace o2::framework
#define O2_BUILTIN_UNREACHABLE
#define O2_BUILTIN_LIKELY(x)
#define O2_BUILTIN_UNLIKELY(x)
uint16_t pid
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
TBranch * ptr
void adopt(const Output &spec, std::string *)
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
DataAllocator(ServiceRegistryRef ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
OutputRoute const & getOutputRoute(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
TrivialObject handles a message object.
auto & add(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLuint stream
Definition glcorearb.h:1806
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
void doWriteTable(std::shared_ptr< FairMQResizableBuffer > b, arrow::Table *table)
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int32_t KEEP_AT_EOS_FLAG
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
std::function< void(T &)> callback
header::Stack metaHeader
Definition Output.h:31
header::DataHeader::SubSpecificationType subSpec
Definition Output.h:30
header::DataDescription description
Definition Output.h:29
header::DataOrigin origin
Definition Output.h:28
std::vector< bool > routeCreated
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
float expectedSize()