Project
Loading...
Searching...
No Matches
DataAllocator.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAALLOCATOR_H_
12#define O2_FRAMEWORK_DATAALLOCATOR_H_
13
17#include "Framework/Output.h"
18#include "Framework/OutputRef.h"
20#include "Framework/DataChunk.h"
24#include "Framework/Traits.h"
29
30#include "Headers/DataHeader.h"
31#include <TClass.h>
32#include <gsl/span>
33
34#include <memory>
35#include <vector>
36#include <map>
37#include <string>
38#include <utility>
39#include <type_traits>
40#include <utility>
41#include <cstddef>
42
43// Do not change this for a full inclusion of fair::mq::Device.
44#include <fairmq/FwdDecls.h>
45
46namespace arrow
47{
48class Schema;
49class Table;
50
51namespace ipc
52{
53class RecordBatchWriter;
54} // namespace ipc
55} // namespace arrow
56
57namespace o2::framework
58{
59struct ServiceRegistry;
60
66template <typename T>
68 using type = T;
69 T* ptr = nullptr;
70 std::function<void(T&)> callback = nullptr;
71 LifetimeHolder(T* ptr_) : ptr(ptr_),
72 callback(nullptr)
73 {
74 }
75 LifetimeHolder() = delete;
76 // Never copy it, because there is only one LifetimeHolder pointer
77 // created object.
81 {
82 this->ptr = other.ptr;
83 other.ptr = nullptr;
84 if (other.callback) {
85 this->callback = std::move(other.callback);
86 } else {
87 this->callback = nullptr;
88 }
89 other.callback = nullptr;
90 }
92 {
93 this->ptr = other.ptr;
94 other.ptr = nullptr;
95 if (other.callback) {
96 this->callback = std::move(other.callback);
97 } else {
98 this->callback = nullptr;
99 }
100 other.callback = nullptr;
101 return *this;
102 }
103
104 // On deletion we invoke the callback and then delete the object,
105 // when prensent.
107 {
108 release();
109 }
110
111 T* operator->() { return ptr; }
112 T& operator*() { return *ptr; }
113
114 // release the owned object, if any. This allows to
115 // invoke the callback early (e.g. for the Product<> case)
116 void release()
117 {
118 if (ptr && callback) {
119 callback(*ptr);
120 delete ptr;
121 ptr = nullptr;
122 }
123 }
124};
125
126template <typename T>
127concept VectorOfMessageableTypes = is_specialization_v<T, std::vector> &&
128 is_messageable<typename T::value_type>::value;
129
135{
136 public:
138 using AllowedOutputRoutes = std::vector<OutputRoute>;
143
144 template <typename T>
145 requires std::is_fundamental_v<T>
147 using value_type = T;
148 };
149
151
152 DataChunk& newChunk(const Output&, size_t);
153
154 inline DataChunk& newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); }
155
156 void adoptChunk(const Output&, char*, size_t, fair::mq::FreeFn*, void*);
157
158 // This method can be used to send a 0xdeadbeef message associated to a given
159 // output. The @a spec will be used to determine the channel to which the
160 // output will need to be sent, however the actual message will be empty
161 // and with subspecification 0xdeadbeef.
162 void cookDeadBeef(const Output& spec);
163
164 template <typename T, typename... Args>
165 requires is_specialization_v<T, o2::framework::DataAllocator::UninitializedVector>
166 decltype(auto) make(const Output& spec, Args... args)
167 {
168 auto& timingInfo = mRegistry.get<TimingInfo>();
169 auto& context = mRegistry.get<MessageContext>();
170
171 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
172 // plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
173 using ValueType = typename T::value_type;
174
175 // Note: initial payload size is 0 and will be set by the context before sending
176 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
178 std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
179 .get();
180 }
181
182 template <typename T, typename... Args>
184 decltype(auto) make(const Output& spec, Args... args)
185 {
186 auto& timingInfo = mRegistry.get<TimingInfo>();
187 auto& context = mRegistry.get<MessageContext>();
188
189 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
190 // this catches all std::vector objects with messageable value type before checking if is also
191 // has a root dictionary, so non-serialized transmission is preferred
192 using ValueType = typename T::value_type;
193
194 // Note: initial payload size is 0 and will be set by the context before sending
195 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
196 return context.add<MessageContext::VectorObject<ValueType>>(std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...).get();
197 }
198
199 template <typename T, typename... Args>
201 decltype(auto) make(const Output& spec, Args... args)
202 {
203 auto& timingInfo = mRegistry.get<TimingInfo>();
204 auto& context = mRegistry.get<MessageContext>();
205
206 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
207 // Extended support for types implementing the Root ClassDef interface, both TObject
208 // derived types and others
210 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodROOT, 0);
211
212 return context.add<typename enable_root_serialization<T>::object_type>(std::move(headerMessage), routeIndex, std::forward<Args>(args)...).get();
213 } else {
214 static_assert(enable_root_serialization<T>::value, "Please make sure you include RootMessageContext.h");
215 }
216 }
217
218 template <typename T, typename... Args>
219 requires std::is_base_of_v<std::string, T>
220 decltype(auto) make(const Output& spec, Args... args)
221 {
222 auto* s = new std::string(args...);
223 adopt(spec, s);
224 return *s;
225 }
226
227 template <typename T, typename... Args>
228 requires(requires { static_cast<struct TableBuilder>(std::declval<std::decay_t<T>>()); })
229 decltype(auto) make(const Output& spec, Args... args)
230 {
231 auto tb = std::move(LifetimeHolder<TableBuilder>(new std::decay_t<T>(args...)));
232 adopt(spec, tb);
233 return tb;
234 }
235
236 template <typename T, typename... Args>
237 requires(requires { static_cast<struct TreeToTable>(std::declval<std::decay_t<T>>()); })
238 decltype(auto) make(const Output& spec, Args... args)
239 {
240 auto t2t = std::move(LifetimeHolder<TreeToTable>(new std::decay_t<T>(args...)));
241 adopt(spec, t2t);
242 return t2t;
243 }
244
245 template <typename T, typename... Args>
246 requires(requires { static_cast<struct FragmentToBatch>(std::declval<std::decay_t<T>>()); })
247 decltype(auto) make(const Output& spec, Args... args)
248 {
249 auto f2b = std::move(LifetimeHolder<FragmentToBatch>(new std::decay_t<T>(args...)));
250 adopt(spec, f2b);
251 return f2b;
252 }
253
254 template <typename T>
255 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
256 decltype(auto) make(const Output& spec)
257 {
258 return *reinterpret_cast<T*>(newChunk(spec, sizeof(T)).data());
259 }
260
261 template <typename T>
262 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
263 decltype(auto) make(const Output& spec, std::integral auto nElements)
264 {
265 auto& timingInfo = mRegistry.get<TimingInfo>();
266 auto& context = mRegistry.get<MessageContext>();
267 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
268
269 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, nElements * sizeof(T));
270 return context.add<MessageContext::SpanObject<T>>(std::move(headerMessage), routeIndex, 0, nElements).get();
271 }
272
273 template <typename T, typename Arg>
274 decltype(auto) make(const Output& spec, std::same_as<std::shared_ptr<arrow::Schema>> auto schema)
275 {
276 std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
277 create(spec, &writer, schema);
278 return writer;
279 }
280
283 void
284 adopt(const Output& spec, std::string*);
285
288 void
290
293 void
295
298 void
300
302 void
303 adopt(const Output& spec, std::shared_ptr<class arrow::Table>);
304
324 template <typename T>
325 void snapshot(const Output& spec, T const& object)
326 {
327 auto& proxy = mRegistry.get<MessageContext>().proxy();
328 fair::mq::MessagePtr payloadMessage;
329 auto serializationType = o2::header::gSerializationMethodNone;
330 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
331 if constexpr (is_messageable<T>::value == true) {
332 // Serialize a snapshot of a trivially copyable, non-polymorphic object,
333 payloadMessage = proxy.createOutputMessage(routeIndex, sizeof(T));
334 memcpy(payloadMessage->GetData(), &object, sizeof(T));
335
336 serializationType = o2::header::gSerializationMethodNone;
337 } else if constexpr (is_specialization_v<T, std::vector> == true ||
338 (gsl::details::is_span<T>::value && has_messageable_value_type<T>::value)) {
339 using ElementType = typename std::remove_pointer<typename T::value_type>::type;
341 // Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
342 // Note: in most cases it is better to use the `make` function und work with the provided
343 // reference object
344 constexpr auto elementSizeInBytes = sizeof(ElementType);
345 auto sizeInBytes = elementSizeInBytes * object.size();
346 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
347
348 if constexpr (std::is_pointer<typename T::value_type>::value == false) {
349 // vector of elements
350 if (object.data() && sizeInBytes) {
351 memcpy(payloadMessage->GetData(), object.data(), sizeInBytes);
352 }
353 } else {
354 // serialize vector of pointers to elements
355 auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
356 for (auto const& pointer : object) {
357 memcpy(target, pointer, elementSizeInBytes);
358 target += elementSizeInBytes;
359 }
360 }
361
362 serializationType = o2::header::gSerializationMethodNone;
363 } else if constexpr (has_root_dictionary<ElementType>::value) {
364 return snapshot(spec, ROOTSerialized<T const>(object));
365 } else {
366 static_assert(always_static_assert_v<T>,
367 "value type of std::vector not supported by API, supported types:"
368 "\n - messageable tyeps (trivially copyable, non-polymorphic structures)"
369 "\n - pointers to those"
370 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
371 }
372 } else if constexpr (is_container<T>::value == true && has_messageable_value_type<T>::value == true) {
373 // Serialize a snapshot of a std::container of trivially copyable, non-polymorphic elements
374 // Note: in most cases it is better to use the `make` function und work with the provided
375 // reference object
376 constexpr auto elementSizeInBytes = sizeof(typename T::value_type);
377 auto sizeInBytes = elementSizeInBytes * object.size();
378 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
379
380 // serialize vector of pointers to elements
381 auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
382 for (auto const& entry : object) {
383 memcpy(target, (void*)&entry, elementSizeInBytes);
384 target += elementSizeInBytes;
385 }
386 serializationType = o2::header::gSerializationMethodNone;
387 } else if constexpr (has_root_dictionary<T>::value == true || is_specialization_v<T, ROOTSerialized> == true) {
388 // Serialize a snapshot of an object with root dictionary
389 payloadMessage = proxy.createOutputMessage(routeIndex);
390 payloadMessage->Rebuild(4096, {64});
391 if constexpr (is_specialization_v<T, ROOTSerialized> == true) {
392 // Explicitely ROOT serialize a snapshot of object.
393 // An object wrapped into type `ROOTSerialized` is explicitely marked to be ROOT serialized
394 // and is expected to have a ROOT dictionary. Availability can not be checked at compile time
395 // for all cases.
396 using WrappedType = typename T::wrapped_type;
397 static_assert(std::is_same<typename T::hint_type, const char>::value ||
398 std::is_same<typename T::hint_type, TClass>::value ||
399 std::is_void<typename T::hint_type>::value,
400 "class hint must be of type TClass or const char");
401
402 const TClass* cl = nullptr;
403 if (object.getHint() == nullptr) {
404 // get TClass info by wrapped type
405 cl = TClass::GetClass(typeid(WrappedType));
406 } else if (std::is_same<typename T::hint_type, TClass>::value) {
407 // the class info has been passed directly
408 cl = reinterpret_cast<const TClass*>(object.getHint());
409 } else if (std::is_same<typename T::hint_type, const char>::value) {
410 // get TClass info by optional name
411 cl = TClass::GetClass(reinterpret_cast<const char*>(object.getHint()));
412 }
413 if (has_root_dictionary<WrappedType>::value == false && cl == nullptr) {
414 if (std::is_same<typename T::hint_type, const char>::value) {
415 throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
416 reinterpret_cast<const char*>(object.getHint()));
417 } else {
418 throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
419 typeid(WrappedType).name());
420 }
421 }
422 typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object(), cl);
423 } else {
424 typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object, TClass::GetClass(typeid(T)));
425 }
426 serializationType = o2::header::gSerializationMethodROOT;
427 } else {
428 static_assert(always_static_assert_v<T>,
429 "data type T not supported by API, \n specializations available for"
430 "\n - trivially copyable, non-polymorphic structures"
431 "\n - std::vector of messageable structures or pointers to those"
432 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
433 }
434 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationType);
435 }
436
440 void snapshot(const Output& spec, const char* payload, size_t payloadSize,
442
448 template <typename T, typename... Args>
449 decltype(auto) make(OutputRef&& ref, Args&&... args)
450 {
451 return make<T>(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
452 }
453
459 template <typename T>
460 void adopt(OutputRef&& ref, T* obj)
461 {
462 return adopt(getOutputByBind(std::move(ref)), obj);
463 }
464
465 // get the memory resource associated with an output
467 {
468 auto& timingInfo = mRegistry.get<TimingInfo>();
469 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
470 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
471 return *proxy.getOutputTransport(routeIndex);
472 }
473
474 // make a stl (pmr) vector
475 template <typename T, typename... Args>
476 o2::pmr::vector<T> makeVector(const Output& spec, Args&&... args)
477 {
478 o2::pmr::FairMQMemoryResource* targetResource = getMemoryResource(spec);
479 return o2::pmr::vector<T>{targetResource, std::forward<Args>(args)...};
480 }
481
482 struct CacheId {
483 int64_t value;
484 };
485
486 enum struct CacheStrategy : int {
487 Never = 0,
488 Always = 1
489 };
490
491 template <typename ContainerT>
492 CacheId adoptContainer(const Output& /*spec*/, ContainerT& /*container*/, CacheStrategy /* cache = false */, o2::header::SerializationMethod /* method = header::gSerializationMethodNone*/)
493 {
494 static_assert(always_static_assert_v<ContainerT>, "Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource");
495 return {0};
496 }
497
507 template <typename ContainerT>
509
512
518 template <typename... Args>
519 auto snapshot(OutputRef&& ref, Args&&... args)
520 {
521 return snapshot(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
522 }
523
525 bool isAllowed(Output const& query);
526
528 {
529 return mRegistry.get<MessageContext>().findMessageHeader(spec);
530 }
531
533 {
534 return mRegistry.get<MessageContext>().findMessageHeader(getOutputByBind(std::move(ref)));
535 }
536
538 {
539 return mRegistry.get<MessageContext>().findMessageHeaderStack(spec);
540 }
541
543 {
544 return mRegistry.get<MessageContext>().findMessageHeaderStack(getOutputByBind(std::move(ref)));
545 }
546
547 int countDeviceOutputs(bool excludeDPLOrigin = false)
548 {
549 return mRegistry.get<MessageContext>().countDeviceOutputs(excludeDPLOrigin);
550 }
551
552 private:
553 ServiceRegistryRef mRegistry;
554
555 RouteIndex matchDataHeader(const Output& spec, size_t timeframeId);
556 fair::mq::MessagePtr headerMessageFromOutput(Output const& spec, //
557 RouteIndex index, //
558 o2::header::SerializationMethod serializationMethod, //
559 size_t payloadSize); //
560
561 Output getOutputByBind(OutputRef&& ref);
562 void addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payload,
563 const Output& spec,
564 o2::header::SerializationMethod serializationMethod);
565};
566
567template <typename ContainerT>
569{
570 // Find a matching channel, extract the message for it form the container
571 // and put it in the queue to be sent at the end of the processing
572 auto& timingInfo = mRegistry.get<TimingInfo>();
573 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
574
575 auto& context = mRegistry.get<MessageContext>();
576 auto* transport = mRegistry.get<FairMQDeviceProxy>().getOutputTransport(routeIndex);
577 fair::mq::MessagePtr payloadMessage = o2::pmr::getMessage(std::forward<ContainerT>(container), *transport);
578 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
579 method, //
580 payloadMessage->GetSize() //
581 );
582
583 CacheId cacheId{0}; //
584 if (cache == CacheStrategy::Always) {
585 // The message will be shallow cloned in the cache. Since the
586 // clone is indistinguishable from the original, we can keep sending
587 // the original.
588 cacheId.value = context.addToCache(payloadMessage);
589 }
590
591 context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
592 return cacheId;
593}
594
595} // namespace o2::framework
596
597#endif // O2_FRAMEWORK_DATAALLOCATOR_H_
Type wrappers for enfording a specific serialization method.
static constexpr ServiceKind service_kind
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec, std::integral auto nElements)
void adopt(const Output &spec, std::string *)
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
o2::pmr::FairMQMemoryResource * getMemoryResource(const Output &spec)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(OutputRef &&ref, Args &&... args)
DataChunk & newChunk(const Output &, size_t)
o2::header::DataHeader::SubSpecificationType SubSpecificationType
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
auto snapshot(OutputRef &&ref, Args &&... args)
void adopt(const Output &spec, std::shared_ptr< class arrow::Table >)
Adopt an Arrow table and send it to all consumers of spec.
int countDeviceOutputs(bool excludeDPLOrigin=false)
o2::header::DataHeader * findMessageHeader(OutputRef &&ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, Args... args)
std::vector< OutputRoute > AllowedOutputRoutes
decltype(auto) make(const Output &spec, Args... args)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
o2::header::Stack * findMessageHeaderStack(OutputRef &&ref)
DataChunk & newChunk(OutputRef &&ref, size_t size)
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, std::same_as< std::shared_ptr< arrow::Schema > > auto schema)
void adopt(OutputRef &&ref, T *obj)
decltype(auto) make(const Output &spec, Args... args)
o2::header::DataHeader * findMessageHeader(const Output &spec)
TrivialObject handles a message object.
GLenum void ** pointer
Definition glcorearb.h:805
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint object
Definition glcorearb.h:4041
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr o2::header::SerializationMethod gSerializationMethodROOT
Definition DataHeader.h:328
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
LifetimeHolder(const LifetimeHolder &)=delete
LifetimeHolder & operator=(LifetimeHolder &&other)
LifetimeHolder & operator=(const LifetimeHolder &)=delete
LifetimeHolder(LifetimeHolder &&other)
std::function< void(T &)> callback
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
static int constexpr size
Definition DataHeader.h:211
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
VectorOfTObjectPtrs other