11#ifndef O2_FRAMEWORK_DATAALLOCATOR_H_
12#define O2_FRAMEWORK_DATAALLOCATOR_H_
44#include <fairmq/FwdDecls.h>
53class RecordBatchWriter;
59struct ServiceRegistry;
82 this->ptr =
other.ptr;
85 this->callback = std::move(
other.callback);
87 this->callback =
nullptr;
89 other.callback =
nullptr;
96 this->callback = std::move(
other.callback);
98 this->callback =
nullptr;
100 other.callback =
nullptr;
128 is_messageable<typename T::value_type>::value;
144 template <
typename T>
145 requires std::is_fundamental_v<T>
164 template <
typename T,
typename... Args>
165 requires is_specialization_v<T, o2::framework::DataAllocator::UninitializedVector>
171 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
173 using ValueType =
typename T::value_type;
178 std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
182 template <
typename T,
typename... Args>
189 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
192 using ValueType =
typename T::value_type;
199 template <
typename T,
typename... Args>
206 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
218 template <
typename T,
typename... Args>
219 requires std::is_base_of_v<std::string, T>
222 auto* s =
new std::string(args...);
227 template <
typename T,
typename... Args>
228 requires(
requires {
static_cast<struct
TableBuilder>(std::declval<std::decay_t<T>>()); })
236 template <
typename T,
typename... Args>
237 requires(
requires {
static_cast<struct
FragmentToBatch>(std::declval<std::decay_t<T>>()); })
245 template <
typename T>
249 return *
reinterpret_cast<T*
>(
newChunk(spec,
sizeof(T)).data());
252 template <
typename T>
254 decltype(
auto)
make(
const Output& spec, std::integral
auto nElements)
258 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
264 template <
typename T,
typename Arg>
265 decltype(
auto)
make(
const Output& spec, std::same_as<std::shared_ptr<arrow::Schema>>
auto schema)
267 std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
268 create(spec, &writer, schema);
310 template <
typename T>
314 fair::mq::MessagePtr payloadMessage;
319 payloadMessage = proxy.createOutputMessage(routeIndex,
sizeof(T));
320 memcpy(payloadMessage->GetData(), &
object,
sizeof(T));
323 }
else if constexpr (is_specialization_v<T, std::vector> ==
true ||
325 using ElementType =
typename std::remove_pointer<typename T::value_type>::type;
330 constexpr auto elementSizeInBytes =
sizeof(ElementType);
331 auto sizeInBytes = elementSizeInBytes *
object.
size();
332 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
334 if constexpr (std::is_pointer<typename T::value_type>::value ==
false) {
336 if (
object.
data() && sizeInBytes) {
337 memcpy(payloadMessage->GetData(),
object.data(), sizeInBytes);
341 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
344 target += elementSizeInBytes;
352 static_assert(always_static_assert_v<T>,
353 "value type of std::vector not supported by API, supported types:"
354 "\n - messageable tyeps (trivially copyable, non-polymorphic structures)"
355 "\n - pointers to those"
356 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
362 constexpr auto elementSizeInBytes =
sizeof(
typename T::value_type);
363 auto sizeInBytes = elementSizeInBytes *
object.size();
364 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
367 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
370 target += elementSizeInBytes;
375 payloadMessage = proxy.createOutputMessage(routeIndex);
376 payloadMessage->Rebuild(4096, {64});
377 if constexpr (is_specialization_v<T, ROOTSerialized> ==
true) {
382 using WrappedType =
typename T::wrapped_type;
383 static_assert(std::is_same<typename T::hint_type, const char>::value ||
384 std::is_same<typename T::hint_type, TClass>::value ||
385 std::is_void<typename T::hint_type>::value,
386 "class hint must be of type TClass or const char");
388 const TClass* cl =
nullptr;
389 if (
object.getHint() ==
nullptr) {
391 cl = TClass::GetClass(
typeid(WrappedType));
392 }
else if (std::is_same<typename T::hint_type, TClass>::value) {
394 cl =
reinterpret_cast<const TClass*
>(
object.getHint());
395 }
else if (std::is_same<typename T::hint_type, const char>::value) {
397 cl = TClass::GetClass(
reinterpret_cast<const char*
>(
object.getHint()));
400 if (std::is_same<typename T::hint_type, const char>::value) {
401 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
402 reinterpret_cast<const char*
>(
object.getHint()));
404 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
405 typeid(WrappedType).
name());
414 static_assert(always_static_assert_v<T>,
415 "data type T not supported by API, \n specializations available for"
416 "\n - trivially copyable, non-polymorphic structures"
417 "\n - std::vector of messageable structures or pointers to those"
418 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
420 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationType);
426 void snapshot(
const Output& spec,
const char* payload,
size_t payloadSize,
434 template <
typename T,
typename... Args>
437 return make<T>(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
445 template <
typename T>
448 return adopt(getOutputByBind(std::move(
ref)), obj);
456 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
457 return *proxy.getOutputTransport(routeIndex);
461 template <
typename T,
typename... Args>
477 template <
typename ContainerT>
480 static_assert(always_static_assert_v<ContainerT>,
"Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource");
493 template <
typename ContainerT>
504 template <
typename... Args>
507 return snapshot(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
542 fair::mq::MessagePtr headerMessageFromOutput(
Output const& spec,
548 void addPartToContext(
RouteIndex routeIndex, fair::mq::MessagePtr&& payload,
553template <
typename ContainerT>
559 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
563 fair::mq::MessagePtr payloadMessage =
o2::pmr::getMessage(std::forward<ContainerT>(container), *transport);
564 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
566 payloadMessage->GetSize()
574 cacheId.
value = context.addToCache(payloadMessage);
Type wrappers for enfording a specific serialization method.
static constexpr ServiceKind service_kind
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec, std::integral auto nElements)
void adopt(const Output &spec, std::string *)
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
o2::pmr::FairMQMemoryResource * getMemoryResource(const Output &spec)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(OutputRef &&ref, Args &&... args)
DataChunk & newChunk(const Output &, size_t)
o2::header::DataHeader::SubSpecificationType SubSpecificationType
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
auto snapshot(OutputRef &&ref, Args &&... args)
void adopt(const Output &spec, std::shared_ptr< class arrow::Table >)
Adopt an Arrow table and send it to all consumers of spec.
int countDeviceOutputs(bool excludeDPLOrigin=false)
o2::header::DataHeader * findMessageHeader(OutputRef &&ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
decltype(auto) make(const Output &spec, Args... args)
std::vector< OutputRoute > AllowedOutputRoutes
decltype(auto) make(const Output &spec, Args... args)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
o2::header::Stack * findMessageHeaderStack(OutputRef &&ref)
DataChunk & newChunk(OutputRef &&ref, size_t size)
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, std::same_as< std::shared_ptr< arrow::Schema > > auto schema)
void adopt(OutputRef &&ref, T *obj)
decltype(auto) make(const Output &spec, Args... args)
o2::header::DataHeader * findMessageHeader(const Output &spec)
TrivialObject handles a message object.
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
LifetimeHolder(const LifetimeHolder &)=delete
LifetimeHolder & operator=(LifetimeHolder &&other)
LifetimeHolder & operator=(const LifetimeHolder &)=delete
LifetimeHolder(LifetimeHolder &&other)
std::function< void(T &)> callback
VectorOfTObjectPtrs other