Project
Loading...
Searching...
No Matches
DataAllocator.h
Go to the documentation of this file.
1// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAALLOCATOR_H_
12#define O2_FRAMEWORK_DATAALLOCATOR_H_
13
17#include "Framework/Output.h"
18#include "Framework/OutputRef.h"
20#include "Framework/DataChunk.h"
24#include "Framework/Traits.h"
29
30#include "Headers/DataHeader.h"
31#include <TClass.h>
32
33#include <memory>
34#include <ranges>
35#include <vector>
36#include <map>
37#include <string>
38#include <utility>
39#include <type_traits>
40#include <utility>
41#include <cstddef>
42
43// Do not change this for a full inclusion of fair::mq::Device.
44#include <fairmq/FwdDecls.h>
45
46namespace arrow
47{
48class Schema;
49class Table;
50
51namespace ipc
52{
53class RecordBatchWriter;
54} // namespace ipc
55} // namespace arrow
56
57namespace o2::framework
58{
59struct ServiceRegistry;
60
66template <typename T>
68 using type = T;
69 T* ptr = nullptr;
70 std::function<void(T&)> callback = nullptr;
71 LifetimeHolder(T* ptr_) : ptr(ptr_),
72 callback(nullptr)
73 {
74 }
75 LifetimeHolder() = delete;
76 // Never copy it, because there is only one LifetimeHolder pointer
77 // created object.
81 {
82 this->ptr = other.ptr;
83 other.ptr = nullptr;
84 if (other.callback) {
85 this->callback = std::move(other.callback);
86 } else {
87 this->callback = nullptr;
88 }
89 other.callback = nullptr;
90 }
92 {
93 this->ptr = other.ptr;
94 other.ptr = nullptr;
95 if (other.callback) {
96 this->callback = std::move(other.callback);
97 } else {
98 this->callback = nullptr;
99 }
100 other.callback = nullptr;
101 return *this;
102 }
103
104 // On deletion we invoke the callback and then delete the object,
105 // when prensent.
107 {
108 release();
109 }
110
111 T* operator->() { return ptr; }
112 T& operator*() { return *ptr; }
113
114 // release the owned object, if any. This allows to
115 // invoke the callback early (e.g. for the Product<> case)
116 void release()
117 {
118 if (ptr && callback) {
119 callback(*ptr);
120 delete ptr;
121 ptr = nullptr;
122 }
123 }
124};
125
126template <typename T>
127concept VectorOfMessageableTypes = is_specialization_v<T, std::vector> &&
128 is_messageable<typename T::value_type>::value;
129
130template <typename T>
131concept ContiguousMessageablesRange = std::ranges::contiguous_range<T> &&
132 is_messageable<typename T::value_type>::value;
133
139{
140 public:
142 using AllowedOutputRoutes = std::vector<OutputRoute>;
147
148 template <typename T>
149 requires std::is_fundamental_v<T>
151 using value_type = T;
152 };
153
155
156 DataChunk& newChunk(const Output&, size_t);
157
158 inline DataChunk& newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); }
159
160 void adoptChunk(const Output&, char*, size_t, fair::mq::FreeFn*, void*);
161
162 // This method can be used to send a 0xdeadbeef message associated to a given
163 // output. The @a spec will be used to determine the channel to which the
164 // output will need to be sent, however the actual message will be empty
165 // and with subspecification 0xdeadbeef.
166 void cookDeadBeef(const Output& spec);
167
168 template <typename T, typename... Args>
169 requires is_specialization_v<T, o2::framework::DataAllocator::UninitializedVector>
170 decltype(auto) make(const Output& spec, Args... args)
171 {
172 auto& timingInfo = mRegistry.get<TimingInfo>();
173 auto& context = mRegistry.get<MessageContext>();
174
175 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
176 // plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
177 using ValueType = typename T::value_type;
178
179 // Note: initial payload size is 0 and will be set by the context before sending
180 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
182 std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
183 .get();
184 }
185
186 template <typename T, typename... Args>
188 decltype(auto) make(const Output& spec, Args... args)
189 {
190 auto& timingInfo = mRegistry.get<TimingInfo>();
191 auto& context = mRegistry.get<MessageContext>();
192
193 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
194 // this catches all std::vector objects with messageable value type before checking if is also
195 // has a root dictionary, so non-serialized transmission is preferred
196 using ValueType = typename T::value_type;
197
198 // Note: initial payload size is 0 and will be set by the context before sending
199 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0);
200 return context.add<MessageContext::VectorObject<ValueType>>(std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...).get();
201 }
202
203 template <typename T, typename... Args>
204 requires(!VectorOfMessageableTypes<T> && has_root_dictionary<T>::value == true && is_messageable<T>::value == false)
205 decltype(auto) make(const Output& spec, Args... args)
206 {
207 auto& timingInfo = mRegistry.get<TimingInfo>();
208 auto& context = mRegistry.get<MessageContext>();
209
210 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
211 // Extended support for types implementing the Root ClassDef interface, both TObject
212 // derived types and others
214 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodROOT, 0);
215
216 return context.add<typename enable_root_serialization<T>::object_type>(std::move(headerMessage), routeIndex, std::forward<Args>(args)...).get();
217 } else {
218 static_assert(enable_root_serialization<T>::value, "Please make sure you include RootMessageContext.h");
219 }
220 }
221
222 template <typename T, typename... Args>
223 requires std::is_base_of_v<std::string, T>
224 decltype(auto) make(const Output& spec, Args... args)
225 {
226 auto* s = new std::string(args...);
227 adopt(spec, s);
228 return *s;
229 }
230
231 template <typename T, typename... Args>
232 requires(requires { static_cast<struct TableBuilder>(std::declval<std::decay_t<T>>()); })
233 decltype(auto) make(const Output& spec, Args... args)
234 {
235 auto tb = std::move(LifetimeHolder<TableBuilder>(new std::decay_t<T>(args...)));
236 adopt(spec, tb);
237 return tb;
238 }
239
240 template <typename T, typename... Args>
241 requires(requires { static_cast<struct FragmentToBatch>(std::declval<std::decay_t<T>>()); })
242 decltype(auto) make(const Output& spec, Args... args)
243 {
244 auto f2b = std::move(LifetimeHolder<FragmentToBatch>(new std::decay_t<T>(args...)));
245 adopt(spec, f2b);
246 return f2b;
247 }
248
249 template <typename T>
250 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
251 decltype(auto) make(const Output& spec)
252 {
253 return *reinterpret_cast<T*>(newChunk(spec, sizeof(T)).data());
254 }
255
256 template <typename T>
257 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
258 decltype(auto) make(const Output& spec, std::integral auto nElements)
259 {
260 auto& timingInfo = mRegistry.get<TimingInfo>();
261 auto& context = mRegistry.get<MessageContext>();
262 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
263
264 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, nElements * sizeof(T));
265 return context.add<MessageContext::SpanObject<T>>(std::move(headerMessage), routeIndex, 0, nElements).get();
266 }
267
268 template <typename T, typename Arg>
269 decltype(auto) make(const Output& spec, std::same_as<std::shared_ptr<arrow::Schema>> auto schema)
270 {
271 std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
272 create(spec, &writer, schema);
273 return writer;
274 }
275
278 void
279 adopt(const Output& spec, std::string*);
280
283 void
285
288 void
290
292 void
293 adopt(const Output& spec, std::shared_ptr<class arrow::Table>);
294
315 template <typename T>
316 requires(!std::ranges::contiguous_range<T> && is_messageable<T>::value)
317 void snapshot(const Output& spec, T const& object)
318 {
319 return snapshot(spec, std::span<T const>(&object, &object + 1));
320 }
321
322 void snapshot(const Output& spec, std::string_view const& object)
323 {
324 return snapshot(spec, std::span<char const>(object.data(), object.size()));
325 }
326
327 // This is for snapshotting a range of contiguous messageable types
328 template <typename T>
329 requires(ContiguousMessageablesRange<T> && !std::is_pointer_v<typename T::value_type>)
330 void snapshot(const Output& spec, T const& object)
331 {
332 auto& proxy = mRegistry.get<MessageContext>().proxy();
333 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
334 using ElementType = typename std::remove_pointer<typename T::value_type>::type;
335 // Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
336 // Note: in most cases it is better to use the `make` function und work with the provided
337 // reference object
338 constexpr auto elementSizeInBytes = sizeof(ElementType);
339 auto sizeInBytes = elementSizeInBytes * object.size();
340 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
341
342 // vector of elements
343 if (object.data() && sizeInBytes) {
344 memcpy(payloadMessage->GetData(), object.data(), sizeInBytes);
345 }
346
347 addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
348 }
349
350 // A random access range of pointers we can serialise by storing the contens one after the other.
351 // On the receiving side you will have to retrieve it via a span
352 template <typename T>
353 requires(std::ranges::random_access_range<T> && is_messageable<typename std::remove_pointer_t<typename T::value_type>>::value && std::is_pointer_v<typename T::value_type>)
354 void snapshot(const Output& spec, T const& object)
355 {
356 auto& proxy = mRegistry.get<MessageContext>().proxy();
357 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
358 using ElementType = typename std::remove_pointer_t<typename T::value_type>;
359 // Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
360 // Note: in most cases it is better to use the `make` function und work with the provided
361 // reference object
362 constexpr auto elementSizeInBytes = sizeof(ElementType);
363 auto sizeInBytes = elementSizeInBytes * object.size();
364 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
365
366 // serialize vector of pointers to elements
367 auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
368 for (auto const& pointer : object) {
369 memcpy(target, pointer, elementSizeInBytes);
370 target += elementSizeInBytes;
371 }
372
373 addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
374 }
375
376 // This is for a range where we can know upfront how many elements there are,
377 // so that we can preallocate the final size by simply multipling sizeof(T) x N elements
378 template <typename T>
379 requires(!std::ranges::contiguous_range<T> && std::ranges::sized_range<T> && has_messageable_value_type<T>::value)
380 void snapshot(const Output& spec, T const& object)
381 {
382 auto& proxy = mRegistry.get<MessageContext>().proxy();
383 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
384 // Serialize a snapshot of a std::container of trivially copyable, non-polymorphic elements
385 // Note: in most cases it is better to use the `make` function und work with the provided
386 // reference object
387 constexpr auto elementSizeInBytes = sizeof(typename T::value_type);
388 auto sizeInBytes = elementSizeInBytes * object.size();
389 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
390
391 // serialize vector of pointers to elements
392 auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
393 for (auto const& entry : object) {
394 memcpy(target, (void*)&entry, elementSizeInBytes);
395 target += elementSizeInBytes;
396 }
397 addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
398 }
399
400 template <typename T>
401 requires(is_specialization_v<T, ROOTSerialized>)
402 void snapshot(const Output& spec, T const& object)
403 {
404 auto& proxy = mRegistry.get<MessageContext>().proxy();
405 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
406 // Serialize a snapshot of an object with root dictionary
407 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
408 payloadMessage->Rebuild(4096, {64});
409 const TClass* cl = nullptr;
410 // Explicitely ROOT serialize a snapshot of object.
411 // An object wrapped into type `ROOTSerialized` is explicitely marked to be ROOT serialized
412 // and is expected to have a ROOT dictionary. Availability can not be checked at compile time
413 // for all cases.
414 using WrappedType = typename T::wrapped_type;
415
416 if (object.getHint() == nullptr) {
417 // get TClass info by wrapped type
418 cl = TClass::GetClass(typeid(WrappedType));
419 } else if (std::is_same<typename T::hint_type, TClass>::value) {
420 // the class info has been passed directly
421 cl = reinterpret_cast<const TClass*>(object.getHint());
422 } else if (std::is_same<typename T::hint_type, const char>::value) {
423 // get TClass info by optional name
424 cl = TClass::GetClass(reinterpret_cast<const char*>(object.getHint()));
425 }
426 if (has_root_dictionary<WrappedType>::value == false && cl == nullptr) {
427 if (std::is_same<typename T::hint_type, const char>::value) {
428 throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
429 reinterpret_cast<const char*>(object.getHint()));
430 } else {
431 throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
432 typeid(WrappedType).name());
433 }
434 }
435 typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object(), cl);
436 addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodROOT);
437 }
438
439 template <typename T>
440 requires(!is_messageable<T>::value && !ContiguousMessageablesRange<T> && has_root_dictionary<T>::value && !is_specialization_v<T, ROOTSerialized>)
441 void snapshot(const Output& spec, T const& object)
442 {
443 auto& proxy = mRegistry.get<MessageContext>().proxy();
444 RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
445 // Serialize a snapshot of an object with root dictionary
446 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
447 payloadMessage->Rebuild(4096, {64});
448 typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object, TClass::GetClass(typeid(T)));
449 addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodROOT);
450 }
451
455 void snapshot(const Output& spec, const char* payload, size_t payloadSize,
457
463 template <typename T, typename... Args>
464 decltype(auto) make(OutputRef&& ref, Args&&... args)
465 {
466 return make<T>(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
467 }
468
474 template <typename T>
475 void adopt(OutputRef&& ref, T* obj)
476 {
477 return adopt(getOutputByBind(std::move(ref)), obj);
478 }
479
480 // get the memory resource associated with an output
482 {
483 auto& timingInfo = mRegistry.get<TimingInfo>();
484 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
485 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
486 return *proxy.getOutputTransport(routeIndex);
487 }
488
489 // make a stl (pmr) vector
490 template <typename T, typename... Args>
491 o2::pmr::vector<T> makeVector(const Output& spec, Args&&... args)
492 {
493 o2::pmr::FairMQMemoryResource* targetResource = getMemoryResource(spec);
494 return o2::pmr::vector<T>{targetResource, std::forward<Args>(args)...};
495 }
496
497 struct CacheId {
498 int64_t value;
499 };
500
501 enum struct CacheStrategy : int {
502 Never = 0,
503 Always = 1
504 };
505
506 template <typename ContainerT>
507 CacheId adoptContainer(const Output& /*spec*/, ContainerT& /*container*/, CacheStrategy /* cache = false */, o2::header::SerializationMethod /* method = header::gSerializationMethodNone*/)
508 {
509 static_assert(always_static_assert_v<ContainerT>, "Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource");
510 return {0};
511 }
512
522 template <typename ContainerT>
524
527
533 template <typename... Args>
534 auto snapshot(OutputRef&& ref, Args&&... args)
535 {
536 return snapshot(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
537 }
538
540 bool isAllowed(Output const& query);
541
543 {
544 return mRegistry.get<MessageContext>().findMessageHeader(spec);
545 }
546
548 {
549 return mRegistry.get<MessageContext>().findMessageHeader(getOutputByBind(std::move(ref)));
550 }
551
553 {
554 return mRegistry.get<MessageContext>().findMessageHeaderStack(spec);
555 }
556
558 {
559 return mRegistry.get<MessageContext>().findMessageHeaderStack(getOutputByBind(std::move(ref)));
560 }
561
562 int countDeviceOutputs(bool excludeDPLOrigin = false)
563 {
564 return mRegistry.get<MessageContext>().countDeviceOutputs(excludeDPLOrigin);
565 }
566
567 private:
568 ServiceRegistryRef mRegistry;
569
570 RouteIndex matchDataHeader(const Output& spec, size_t timeframeId);
571 fair::mq::MessagePtr headerMessageFromOutput(Output const& spec, //
572 RouteIndex index, //
573 o2::header::SerializationMethod serializationMethod, //
574 size_t payloadSize); //
575
576 Output getOutputByBind(OutputRef&& ref);
577 void addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payload,
578 const Output& spec,
579 o2::header::SerializationMethod serializationMethod);
580};
581
582template <typename ContainerT>
584{
585 // Find a matching channel, extract the message for it form the container
586 // and put it in the queue to be sent at the end of the processing
587 auto& timingInfo = mRegistry.get<TimingInfo>();
588 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
589
590 auto& context = mRegistry.get<MessageContext>();
591 auto* transport = mRegistry.get<FairMQDeviceProxy>().getOutputTransport(routeIndex);
592 fair::mq::MessagePtr payloadMessage = o2::pmr::getMessage(std::forward<ContainerT>(container), *transport);
593 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex, //
594 method, //
595 payloadMessage->GetSize() //
596 );
597
598 CacheId cacheId{0}; //
599 if (cache == CacheStrategy::Always) {
600 // The message will be shallow cloned in the cache. Since the
601 // clone is indistinguishable from the original, we can keep sending
602 // the original.
603 cacheId.value = context.addToCache(payloadMessage);
604 }
605
606 context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
607 return cacheId;
608}
609
610} // namespace o2::framework
611
612#endif // O2_FRAMEWORK_DATAALLOCATOR_H_
std::shared_ptr< arrow::Schema > schema
Type wrappers for enfording a specific serialization method.
static constexpr ServiceKind service_kind
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec, std::integral auto nElements)
void adopt(const Output &spec, std::string *)
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void snapshot(const Output &spec, T const &object)
o2::pmr::FairMQMemoryResource * getMemoryResource(const Output &spec)
decltype(auto) make(OutputRef &&ref, Args &&... args)
DataChunk & newChunk(const Output &, size_t)
o2::header::DataHeader::SubSpecificationType SubSpecificationType
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec)
void snapshot(const Output &spec, std::string_view const &object)
decltype(auto) make(const Output &spec, Args... args)
auto snapshot(OutputRef &&ref, Args &&... args)
void adopt(const Output &spec, std::shared_ptr< class arrow::Table >)
Adopt an Arrow table and send it to all consumers of spec.
int countDeviceOutputs(bool excludeDPLOrigin=false)
o2::header::DataHeader * findMessageHeader(OutputRef &&ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
decltype(auto) make(const Output &spec, Args... args)
std::vector< OutputRoute > AllowedOutputRoutes
decltype(auto) make(const Output &spec, Args... args)
void snapshot(const Output &spec, T const &object)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
void snapshot(const Output &spec, T const &object)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
o2::header::Stack * findMessageHeaderStack(OutputRef &&ref)
DataChunk & newChunk(OutputRef &&ref, size_t size)
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, std::same_as< std::shared_ptr< arrow::Schema > > auto schema)
void adopt(OutputRef &&ref, T *obj)
decltype(auto) make(const Output &spec, Args... args)
o2::header::DataHeader * findMessageHeader(const Output &spec)
void snapshot(const Output &spec, T const &object)
void snapshot(const Output &spec, T const &object)
void snapshot(const Output &spec, T const &object)
TrivialObject handles a message object.
GLenum void ** pointer
Definition glcorearb.h:805
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint object
Definition glcorearb.h:4041
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr o2::header::SerializationMethod gSerializationMethodROOT
Definition DataHeader.h:328
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
fair::mq::MemoryResource FairMQMemoryResource
LifetimeHolder(const LifetimeHolder &)=delete
LifetimeHolder & operator=(LifetimeHolder &&other)
LifetimeHolder & operator=(const LifetimeHolder &)=delete
LifetimeHolder(LifetimeHolder &&other)
std::function< void(T &)> callback
the main header struct
Definition DataHeader.h:619
uint32_t SubSpecificationType
Definition DataHeader.h:621
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
VectorOfTObjectPtrs other