11#ifndef O2_FRAMEWORK_DATAALLOCATOR_H_
12#define O2_FRAMEWORK_DATAALLOCATOR_H_
44#include <fairmq/FwdDecls.h>
53class RecordBatchWriter;
59struct ServiceRegistry;
82 this->ptr =
other.ptr;
85 this->callback = std::move(
other.callback);
87 this->callback =
nullptr;
89 other.callback =
nullptr;
96 this->callback = std::move(
other.callback);
98 this->callback =
nullptr;
100 other.callback =
nullptr;
128 is_messageable<typename T::value_type>::value;
144 template <
typename T>
145 requires std::is_fundamental_v<T>
164 template <
typename T,
typename... Args>
165 requires is_specialization_v<T, o2::framework::DataAllocator::UninitializedVector>
171 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
173 using ValueType =
typename T::value_type;
178 std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
182 template <
typename T,
typename... Args>
189 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
192 using ValueType =
typename T::value_type;
199 template <
typename T,
typename... Args>
206 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
218 template <
typename T,
typename... Args>
219 requires std::is_base_of_v<std::string, T>
222 auto* s =
new std::string(args...);
227 template <
typename T,
typename... Args>
228 requires(
requires {
static_cast<struct
TableBuilder>(std::declval<std::decay_t<T>>()); })
236 template <
typename T,
typename... Args>
237 requires(
requires {
static_cast<struct
TreeToTable>(std::declval<std::decay_t<T>>()); })
245 template <
typename T,
typename... Args>
246 requires(
requires {
static_cast<struct
FragmentToBatch>(std::declval<std::decay_t<T>>()); })
254 template <
typename T>
258 return *
reinterpret_cast<T*
>(
newChunk(spec,
sizeof(T)).data());
261 template <
typename T>
263 decltype(
auto)
make(
const Output& spec, std::integral
auto nElements)
267 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
273 template <
typename T,
typename Arg>
274 decltype(
auto)
make(
const Output& spec, std::same_as<std::shared_ptr<arrow::Schema>>
auto schema)
276 std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
277 create(spec, &writer, schema);
324 template <
typename T>
328 fair::mq::MessagePtr payloadMessage;
333 payloadMessage = proxy.createOutputMessage(routeIndex,
sizeof(T));
334 memcpy(payloadMessage->GetData(), &
object,
sizeof(T));
337 }
else if constexpr (is_specialization_v<T, std::vector> ==
true ||
339 using ElementType =
typename std::remove_pointer<typename T::value_type>::type;
344 constexpr auto elementSizeInBytes =
sizeof(ElementType);
345 auto sizeInBytes = elementSizeInBytes *
object.
size();
346 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
348 if constexpr (std::is_pointer<typename T::value_type>::value ==
false) {
350 if (
object.
data() && sizeInBytes) {
351 memcpy(payloadMessage->GetData(),
object.data(), sizeInBytes);
355 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
358 target += elementSizeInBytes;
366 static_assert(always_static_assert_v<T>,
367 "value type of std::vector not supported by API, supported types:"
368 "\n - messageable tyeps (trivially copyable, non-polymorphic structures)"
369 "\n - pointers to those"
370 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
376 constexpr auto elementSizeInBytes =
sizeof(
typename T::value_type);
377 auto sizeInBytes = elementSizeInBytes *
object.size();
378 payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
381 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
384 target += elementSizeInBytes;
389 payloadMessage = proxy.createOutputMessage(routeIndex);
390 payloadMessage->Rebuild(4096, {64});
391 if constexpr (is_specialization_v<T, ROOTSerialized> ==
true) {
396 using WrappedType =
typename T::wrapped_type;
397 static_assert(std::is_same<typename T::hint_type, const char>::value ||
398 std::is_same<typename T::hint_type, TClass>::value ||
399 std::is_void<typename T::hint_type>::value,
400 "class hint must be of type TClass or const char");
402 const TClass* cl =
nullptr;
403 if (
object.getHint() ==
nullptr) {
405 cl = TClass::GetClass(
typeid(WrappedType));
406 }
else if (std::is_same<typename T::hint_type, TClass>::value) {
408 cl =
reinterpret_cast<const TClass*
>(
object.getHint());
409 }
else if (std::is_same<typename T::hint_type, const char>::value) {
411 cl = TClass::GetClass(
reinterpret_cast<const char*
>(
object.getHint()));
414 if (std::is_same<typename T::hint_type, const char>::value) {
415 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
416 reinterpret_cast<const char*
>(
object.getHint()));
418 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
419 typeid(WrappedType).
name());
428 static_assert(always_static_assert_v<T>,
429 "data type T not supported by API, \n specializations available for"
430 "\n - trivially copyable, non-polymorphic structures"
431 "\n - std::vector of messageable structures or pointers to those"
432 "\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
434 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationType);
440 void snapshot(
const Output& spec,
const char* payload,
size_t payloadSize,
448 template <
typename T,
typename... Args>
451 return make<T>(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
459 template <
typename T>
462 return adopt(getOutputByBind(std::move(
ref)), obj);
470 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
471 return *proxy.getOutputTransport(routeIndex);
475 template <
typename T,
typename... Args>
491 template <
typename ContainerT>
494 static_assert(always_static_assert_v<ContainerT>,
"Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource");
507 template <
typename ContainerT>
518 template <
typename... Args>
521 return snapshot(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
556 fair::mq::MessagePtr headerMessageFromOutput(
Output const& spec,
562 void addPartToContext(
RouteIndex routeIndex, fair::mq::MessagePtr&& payload,
567template <
typename ContainerT>
573 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
577 fair::mq::MessagePtr payloadMessage =
o2::pmr::getMessage(std::forward<ContainerT>(container), *transport);
578 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
580 payloadMessage->GetSize()
588 cacheId.
value = context.addToCache(payloadMessage);
Type wrappers for enfording a specific serialization method.
static constexpr ServiceKind service_kind
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec, std::integral auto nElements)
void adopt(const Output &spec, std::string *)
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
o2::pmr::FairMQMemoryResource * getMemoryResource(const Output &spec)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(OutputRef &&ref, Args &&... args)
DataChunk & newChunk(const Output &, size_t)
o2::header::DataHeader::SubSpecificationType SubSpecificationType
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
auto snapshot(OutputRef &&ref, Args &&... args)
void adopt(const Output &spec, std::shared_ptr< class arrow::Table >)
Adopt an Arrow table and send it to all consumers of spec.
int countDeviceOutputs(bool excludeDPLOrigin=false)
o2::header::DataHeader * findMessageHeader(OutputRef &&ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, Args... args)
std::vector< OutputRoute > AllowedOutputRoutes
decltype(auto) make(const Output &spec, Args... args)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
o2::header::Stack * findMessageHeaderStack(OutputRef &&ref)
DataChunk & newChunk(OutputRef &&ref, size_t size)
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, std::same_as< std::shared_ptr< arrow::Schema > > auto schema)
void adopt(OutputRef &&ref, T *obj)
decltype(auto) make(const Output &spec, Args... args)
o2::header::DataHeader * findMessageHeader(const Output &spec)
TrivialObject handles a message object.
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
LifetimeHolder(const LifetimeHolder &)=delete
LifetimeHolder & operator=(LifetimeHolder &&other)
LifetimeHolder & operator=(const LifetimeHolder &)=delete
LifetimeHolder(LifetimeHolder &&other)
std::function< void(T &)> callback
VectorOfTObjectPtrs other