Project
Loading...
Searching...
No Matches
DataAllocator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
23#include "Framework/Signpost.h"
24#include "Headers/DataHeader.h"
26#include "Headers/Stack.h"
27
28#include <fairmq/Device.h>
29
30#include <arrow/ipc/writer.h>
31#include <arrow/type.h>
32#include <arrow/io/memory.h>
33#include <arrow/util/config.h>
34
35#include <TClonesArray.h>
36
37#include <utility>
38
39O2_DECLARE_DYNAMIC_LOG(stream_context);
41
42namespace o2::framework
43{
44
48
50 : mRegistry{contextRegistry}
51{
52}
53
54RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice)
55{
56 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
57 auto& stream = mRegistry.get<o2::framework::StreamContext>();
58 // FIXME: we should take timeframeId into account as well.
59 for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
60 auto& route = allowedOutputRoutes[ri];
61 if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
62 stream.routeCreated.at(ri) = true;
63 auto sid = _o2_signpost_id_t{(int64_t)&stream};
64 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "data_allocator", "Route %" PRIu64 " (%{public}s) created for timeslice %" PRIu64,
65 (uint64_t)ri, DataSpecUtils::describe(route.matcher).c_str(), (uint64_t)timeslice);
66 return RouteIndex{ri};
67 }
68 }
69 throw runtime_error_f(
70 "Worker is not authorised to create message with "
71 "origin(%s) description(%s) subSpec(%d)",
72 spec.origin.as<std::string>().c_str(),
73 spec.description.as<std::string>().c_str(),
74 spec.subSpec);
75}
76
78{
79 auto& timingInfo = mRegistry.get<TimingInfo>();
80 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
81 auto& context = mRegistry.get<MessageContext>();
82
83 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
85 size //
86 );
87 auto& co = context.add<MessageContext::ContainerRefObject<DataChunk>>(std::move(headerMessage), routeIndex, 0, size);
88 return co;
89}
90
91void DataAllocator::adoptChunk(const Output& spec, char* buffer, size_t size, fair::mq::FreeFn* freefn, void* hint = nullptr)
92{
93 // Find a matching channel, create a new message for it and put it in the
94 // queue to be sent at the end of the processing
95 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
96
97 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
99 size //
100 );
101
102 // FIXME: how do we want to use subchannels? time based parallelism?
103 auto& context = mRegistry.get<MessageContext>();
104 context.add<MessageContext::TrivialObject>(std::move(headerMessage), routeIndex, 0, buffer, size, freefn, hint);
105}
106
107fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, //
108 RouteIndex routeIndex, //
110 size_t payloadSize) //
111{
112 auto& timingInfo = mRegistry.get<TimingInfo>();
113 DataHeader dh;
114 dh.dataOrigin = spec.origin;
115 dh.dataDescription = spec.description;
116 dh.subSpecification = spec.subSpec;
117 dh.payloadSize = payloadSize;
118 dh.payloadSerializationMethod = method;
119 dh.tfCounter = timingInfo.tfCounter;
120 dh.firstTForbit = timingInfo.firstTForbit;
121 dh.runNumber = timingInfo.runNumber;
122
123 DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
124 static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
125 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
126 auto* transport = proxy.getOutputTransport(routeIndex);
127
128 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
129 return o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, spec.metaHeader});
130}
131
132void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payloadMessage, const Output& spec,
133 o2::header::SerializationMethod serializationMethod)
134{
135 auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0);
136 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerMessage->GetData());
137 // FIXME: this is kind of ugly, we know that we can change the content of the
138 // header message because we have just created it, but the API declares it const
139 const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
140 auto* dh = const_cast<DataHeader*>(cdh);
141 dh->payloadSize = payloadMessage->GetSize();
142 O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
143 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", headerMessage->GetData(), dh->payloadSize);
144 auto& context = mRegistry.get<MessageContext>();
145 // make_scoped creates the context object inside of a scope handler, since it goes out of
146 // scope immediately, the created object is scheduled and can be directly sent if the context
147 // is configured with the dispatcher callback
148 context.make_scoped<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
149}
150
151void DataAllocator::adopt(const Output& spec, std::string* ptr)
152{
153 std::unique_ptr<std::string> payload(ptr);
154 auto& timingInfo = mRegistry.get<TimingInfo>();
155 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
156 // the correct payload size is set later when sending the
157 // StringContext, see DataProcessor::doSend
158 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
159 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
160 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
161 O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %{public}s@%p %" PRIu64,
162 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
163 mRegistry.get<StringContext>().addString(std::move(header), std::move(payload), routeIndex);
164 assert(payload.get() == nullptr);
165}
166
167void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
168{
169 auto mock = std::make_shared<arrow::io::MockOutputStream>();
170 int64_t expectedSize = 0;
171 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), table->schema());
172 arrow::Status outStatus;
173 if (O2_BUILTIN_LIKELY(table->num_rows() != 0)) {
174 outStatus = mockWriter.ValueOrDie()->WriteTable(*table);
175 } else {
176 std::vector<std::shared_ptr<arrow::Array>> columns;
177 columns.resize(table->columns().size());
178 for (size_t ci = 0; ci < table->columns().size(); ci++) {
179 columns[ci] = table->column(ci)->chunk(0);
180 }
181 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
182 outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
183 }
184
185 expectedSize = mock->Tell().ValueOrDie();
186 auto reserve = b->Reserve(expectedSize);
187 if (reserve.ok() == false) {
188 throw std::runtime_error("Unable to reserve memory for table");
189 }
190
191 auto stream = std::make_shared<FairMQOutputStream>(b);
192 auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), table->schema());
193 if (outBatch.ok() == false) {
194 throw ::std::runtime_error("Unable to create batch writer");
195 }
196
197 if (O2_BUILTIN_UNLIKELY(table->num_rows() == 0)) {
198 std::vector<std::shared_ptr<arrow::Array>> columns;
199 columns.resize(table->columns().size());
200 for (size_t ci = 0; ci < table->columns().size(); ci++) {
201 columns[ci] = table->column(ci)->chunk(0);
202 }
203 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
204 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
205 } else {
206 outStatus = outBatch.ValueOrDie()->WriteTable(*table);
207 }
208
209 if (outStatus.ok() == false) {
210 throw std::runtime_error("Unable to Write table");
211 }
212}
213
214void doWriteBatch(std::shared_ptr<FairMQResizableBuffer> b, arrow::RecordBatch* batch)
215{
216 auto mock = std::make_shared<arrow::io::MockOutputStream>();
217 int64_t expectedSize = 0;
218 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
219 arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
220
221 expectedSize = mock->Tell().ValueOrDie();
222 auto reserve = b->Reserve(expectedSize);
223 if (reserve.ok() == false) {
224 throw std::runtime_error("Unable to reserve memory for table");
225 }
226
227 auto stream = std::make_shared<FairMQOutputStream>(b);
228 // This is a copy maybe we can finally get rid of it by having using the
229 // dataset API?
230 auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema());
231 if (outBatch.ok() == false) {
232 throw ::std::runtime_error("Unable to create batch writer");
233 }
234
235 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
236
237 if (outStatus.ok() == false) {
238 throw std::runtime_error("Unable to Write batch");
239 }
240}
241
243{
244 auto& timingInfo = mRegistry.get<TimingInfo>();
245 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
246 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
247 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
248 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData());
249 O2_SIGNPOST_START(parts, pid, "parts", "adopt %{public}s@%p %" PRIu64,
250 cdh ? fmt::format("{}/{}/{}", cdh->dataOrigin, cdh->dataDescription, cdh->subSpecification).c_str() : "unknown", header->GetData(), cdh->payloadSize);
251 auto& context = mRegistry.get<ArrowContext>();
252
253 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
254 return transport->CreateMessage(s);
255 };
256 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
257
258 tb.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](TableBuilder& builder) -> void {
259 auto table = builder.finalize();
260 doWriteTable(buffer, table.get());
261 // deletion happens in the caller
262 };
263
265 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
266 // Finalization not needed, as we do it using the LifetimeHolder callback
267 };
268
269 context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
270}
271
273{
274 auto& timingInfo = mRegistry.get<TimingInfo>();
275 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
276
277 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
278 auto& context = mRegistry.get<ArrowContext>();
279
280 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
281 return transport->CreateMessage(s);
282 };
283 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
284
285 t2t.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](TreeToTable& tree) {
286 // Serialization happens in here, so that we can
287 // get rid of the intermediate tree 2 table object, saving memory.
288 auto table = tree.finalize();
289 doWriteTable(buffer, table.get());
290 // deletion happens in the caller
291 };
292
296 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
297 // This is empty because we already serialised the object when
298 // the LifetimeHolder goes out of scope.
299 };
300
301 context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
302}
303
305{
306 auto& timingInfo = mRegistry.get<TimingInfo>();
307 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
308
309 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
310 auto& context = mRegistry.get<ArrowContext>();
311
312 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
313 return transport->CreateMessage(s);
314 };
315 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
316
317 f2b.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](FragmentToBatch& source) {
318 // Serialization happens in here, so that we can
319 // get rid of the intermediate tree 2 table object, saving memory.
320 auto batch = source.finalize();
321 doWriteBatch(buffer, batch.get());
322 // deletion happens in the caller
323 };
324
328 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
329 // This is empty because we already serialised the object when
330 // the LifetimeHolder goes out of scope.
331 };
332
333 context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
334}
335
336void DataAllocator::adopt(const Output& spec, std::shared_ptr<arrow::Table> ptr)
337{
338 auto& timingInfo = mRegistry.get<TimingInfo>();
339 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
340 auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
341 auto& context = mRegistry.get<ArrowContext>();
342
343 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
344 return transport->CreateMessage(s);
345 };
346 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
347
348 auto writer = [table = ptr](std::shared_ptr<FairMQResizableBuffer> b) -> void {
349 doWriteTable(b, table.get());
350 };
351
352 context.addBuffer(std::move(header), buffer, std::move(writer), routeIndex);
353}
354
355void DataAllocator::snapshot(const Output& spec, const char* payload, size_t payloadSize,
356 o2::header::SerializationMethod serializationMethod)
357{
358 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
359 auto& timingInfo = mRegistry.get<TimingInfo>();
360
361 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
362 fair::mq::MessagePtr payloadMessage(proxy.createOutputMessage(routeIndex, payloadSize));
363 memcpy(payloadMessage->GetData(), payload, payloadSize);
364
365 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationMethod);
366}
367
368Output DataAllocator::getOutputByBind(OutputRef&& ref)
369{
370 if (ref.label.empty()) {
371 throw runtime_error("Invalid (empty) OutputRef provided.");
372 }
373 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
374 for (auto ri = 0ul, re = allowedOutputRoutes.size(); ri != re; ++ri) {
375 if (allowedOutputRoutes[ri].matcher.binding.value == ref.label) {
376 auto spec = allowedOutputRoutes[ri].matcher;
377 auto dataType = DataSpecUtils::asConcreteDataTypeMatcher(spec);
378 return Output{dataType.origin, dataType.description, ref.subSpec, std::move(ref.headerStack)};
379 }
380 }
381 std::string availableRoutes;
382 for (auto const& route : allowedOutputRoutes) {
383 availableRoutes += "\n - " + route.matcher.binding.value + ": " + DataSpecUtils::describe(route.matcher);
384 }
385 throw runtime_error_f("Unable to find OutputSpec with label %s. Available Routes: %s", ref.label.c_str(), availableRoutes.c_str());
387}
388
390{
391 auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
392 for (auto const& route : allowedOutputRoutes) {
393 if (DataSpecUtils::match(route.matcher, query.origin, query.description, query.subSpec)) {
394 return true;
395 }
396 }
397 return false;
398}
399
401{
402 // Find a matching channel, extract the message for it form the container
403 // and put it in the queue to be sent at the end of the processing
404 auto& timingInfo = mRegistry.get<TimingInfo>();
405 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
406
407 auto& context = mRegistry.get<MessageContext>();
408 fair::mq::MessagePtr payloadMessage = context.cloneFromCache(id.value);
409
410 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
411 method, //
412 payloadMessage->GetSize() //
413 );
414
415 context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
416}
417
419{
420 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
421 auto& timingInfo = mRegistry.get<TimingInfo>();
422
423 // We get the output route from the original spec, but we send it
424 // using the binding of the deadbeef subSpecification.
425 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
426 auto deadBeefOutput = Output{spec.origin, spec.description, 0xdeadbeef};
427 auto headerMessage = headerMessageFromOutput(deadBeefOutput, routeIndex, header::gSerializationMethodNone, 0);
428
429 addPartToContext(routeIndex, proxy.createOutputMessage(routeIndex, 0), deadBeefOutput, header::gSerializationMethodNone);
430}
431
432} // namespace o2::framework
#define O2_BUILTIN_UNREACHABLE
#define O2_BUILTIN_LIKELY(x)
#define O2_BUILTIN_UNLIKELY(x)
uint16_t pid
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
TBranch * ptr
void adopt(const Output &spec, std::string *)
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
DataAllocator(ServiceRegistryRef ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
TrivialObject handles a message object.
auto & add(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLuint stream
Definition glcorearb.h:1806
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
void doWriteBatch(std::shared_ptr< FairMQResizableBuffer > b, arrow::RecordBatch *batch)
void doWriteTable(std::shared_ptr< FairMQResizableBuffer > b, arrow::Table *table)
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int32_t KEEP_AT_EOS_FLAG
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
std::function< void(T &)> callback
header::Stack metaHeader
Definition Output.h:31
header::DataHeader::SubSpecificationType subSpec
Definition Output.h:30
header::DataDescription description
Definition Output.h:29
header::DataOrigin origin
Definition Output.h:28
std::vector< bool > routeCreated
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
float expectedSize()
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))