11#ifndef O2_FRAMEWORK_DATAALLOCATOR_H_
12#define O2_FRAMEWORK_DATAALLOCATOR_H_
44#include <fairmq/FwdDecls.h>
53class RecordBatchWriter;
59struct ServiceRegistry;
82 this->ptr =
other.ptr;
85 this->callback = std::move(
other.callback);
87 this->callback =
nullptr;
89 other.callback =
nullptr;
96 this->callback = std::move(
other.callback);
98 this->callback =
nullptr;
100 other.callback =
nullptr;
128 is_messageable<typename T::value_type>::value;
132 is_messageable<typename T::value_type>::value;
148 template <
typename T>
149 requires std::is_fundamental_v<T>
168 template <
typename T,
typename... Args>
169 requires is_specialization_v<T, o2::framework::DataAllocator::UninitializedVector>
175 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
177 using ValueType =
typename T::value_type;
182 std::move(headerMessage), routeIndex, 0, std::forward<Args>(args)...)
186 template <
typename T,
typename... Args>
193 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
196 using ValueType =
typename T::value_type;
203 template <
typename T,
typename... Args>
210 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
222 template <
typename T,
typename... Args>
223 requires std::is_base_of_v<std::string, T>
226 auto* s =
new std::string(args...);
231 template <
typename T,
typename... Args>
232 requires(
requires {
static_cast<struct
TableBuilder>(std::declval<std::decay_t<T>>()); })
240 template <
typename T,
typename... Args>
241 requires(
requires {
static_cast<struct
FragmentToBatch>(std::declval<std::decay_t<T>>()); })
249 template <
typename T>
250 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
253 return *
reinterpret_cast<T*
>(
newChunk(spec,
sizeof(T)).data());
256 template <
typename T>
257 requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
258 decltype(
auto)
make(
const Output& spec, std::integral
auto nElements)
262 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
268 template <
typename T,
typename Arg>
269 decltype(
auto)
make(
const Output& spec, std::same_as<std::shared_ptr<arrow::Schema>>
auto schema)
271 std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
272 create(spec, &writer,
schema);
315 template <
typename T>
316 requires(!std::ranges::contiguous_range<T> && is_messageable<T>::value)
319 return snapshot(spec, std::span<T const>(&
object, &
object + 1));
328 template <
typename T>
334 using ElementType =
typename std::remove_pointer<typename T::value_type>::type;
338 constexpr auto elementSizeInBytes =
sizeof(ElementType);
339 auto sizeInBytes = elementSizeInBytes *
object.size();
340 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
343 if (
object.
data() && sizeInBytes) {
344 memcpy(payloadMessage->GetData(),
object.data(), sizeInBytes);
352 template <
typename T>
353 requires(std::ranges::random_access_range<T> && is_messageable<typename std::remove_pointer_t<typename T::value_type>>
::value && std::is_pointer_v<typename T::value_type>)
358 using ElementType =
typename std::remove_pointer_t<typename T::value_type>;
362 constexpr auto elementSizeInBytes =
sizeof(ElementType);
363 auto sizeInBytes = elementSizeInBytes *
object.size();
364 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
367 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
370 target += elementSizeInBytes;
378 template <
typename T>
387 constexpr auto elementSizeInBytes =
sizeof(
typename T::value_type);
388 auto sizeInBytes = elementSizeInBytes *
object.size();
389 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);
392 auto target =
reinterpret_cast<unsigned char*
>(payloadMessage->GetData());
395 target += elementSizeInBytes;
400 template <
typename T>
401 requires(is_specialization_v<T, ROOTSerialized>)
407 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
408 payloadMessage->Rebuild(4096, {64});
409 const TClass* cl =
nullptr;
414 using WrappedType =
typename T::wrapped_type;
416 if (
object.getHint() ==
nullptr) {
418 cl = TClass::GetClass(
typeid(WrappedType));
419 }
else if (std::is_same<typename T::hint_type, TClass>::value) {
421 cl =
reinterpret_cast<const TClass*
>(
object.getHint());
422 }
else if (std::is_same<typename T::hint_type, const char>::value) {
424 cl = TClass::GetClass(
reinterpret_cast<const char*
>(
object.getHint()));
427 if (std::is_same<typename T::hint_type, const char>::value) {
428 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
429 reinterpret_cast<const char*
>(
object.getHint()));
431 throw runtime_error_f(
"ROOT serialization not supported, dictionary not found for type %s",
432 typeid(WrappedType).
name());
439 template <
typename T>
446 fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
447 payloadMessage->Rebuild(4096, {64});
455 void snapshot(
const Output& spec,
const char* payload,
size_t payloadSize,
463 template <
typename T,
typename... Args>
466 return make<T>(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
474 template <
typename T>
477 return adopt(getOutputByBind(std::move(
ref)), obj);
485 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
486 return *proxy.getOutputTransport(routeIndex);
490 template <
typename T,
typename... Args>
506 template <
typename ContainerT>
509 static_assert(always_static_assert_v<ContainerT>,
"Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource");
522 template <
typename ContainerT>
533 template <
typename... Args>
536 return snapshot(getOutputByBind(std::move(
ref)), std::forward<Args>(args)...);
571 fair::mq::MessagePtr headerMessageFromOutput(
Output const& spec,
577 void addPartToContext(
RouteIndex routeIndex, fair::mq::MessagePtr&& payload,
582template <
typename ContainerT>
588 auto routeIndex = matchDataHeader(spec, timingInfo.timeslice);
592 fair::mq::MessagePtr payloadMessage =
o2::pmr::getMessage(std::forward<ContainerT>(container), *transport);
593 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
595 payloadMessage->GetSize()
603 cacheId.
value = context.addToCache(payloadMessage);
std::shared_ptr< arrow::Schema > schema
Type wrappers for enfording a specific serialization method.
static constexpr ServiceKind service_kind
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec, std::integral auto nElements)
void adopt(const Output &spec, std::string *)
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void snapshot(const Output &spec, T const &object)
o2::pmr::FairMQMemoryResource * getMemoryResource(const Output &spec)
decltype(auto) make(OutputRef &&ref, Args &&... args)
DataChunk & newChunk(const Output &, size_t)
o2::header::DataHeader::SubSpecificationType SubSpecificationType
o2::header::Stack * findMessageHeaderStack(const Output &spec)
decltype(auto) make(const Output &spec, Args... args)
::value &&!is_specialization_v< T, UninitializedVector > decltype(auto) make(const Output &spec)
void snapshot(const Output &spec, std::string_view const &object)
decltype(auto) make(const Output &spec, Args... args)
auto snapshot(OutputRef &&ref, Args &&... args)
void adopt(const Output &spec, std::shared_ptr< class arrow::Table >)
Adopt an Arrow table and send it to all consumers of spec.
int countDeviceOutputs(bool excludeDPLOrigin=false)
o2::header::DataHeader * findMessageHeader(OutputRef &&ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
decltype(auto) make(const Output &spec, Args... args)
std::vector< OutputRoute > AllowedOutputRoutes
decltype(auto) make(const Output &spec, Args... args)
void snapshot(const Output &spec, T const &object)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
void snapshot(const Output &spec, T const &object)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
o2::header::Stack * findMessageHeaderStack(OutputRef &&ref)
DataChunk & newChunk(OutputRef &&ref, size_t size)
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) make(const Output &spec, std::same_as< std::shared_ptr< arrow::Schema > > auto schema)
void adopt(OutputRef &&ref, T *obj)
decltype(auto) make(const Output &spec, Args... args)
o2::header::DataHeader * findMessageHeader(const Output &spec)
void snapshot(const Output &spec, T const &object)
void snapshot(const Output &spec, T const &object)
void snapshot(const Output &spec, T const &object)
TrivialObject handles a message object.
GLuint const GLchar * name
GLsizei const GLfloat * value
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
fair::mq::MemoryResource FairMQMemoryResource
LifetimeHolder(const LifetimeHolder &)=delete
LifetimeHolder & operator=(LifetimeHolder &&other)
LifetimeHolder & operator=(const LifetimeHolder &)=delete
LifetimeHolder(LifetimeHolder &&other)
std::function< void(T &)> callback
VectorOfTObjectPtrs other