11#ifndef O2_FRAMEWORK_MESSAGECONTEXT_H_
12#define O2_FRAMEWORK_MESSAGECONTEXT_H_
27#include <fairmq/Message.h>
28#include <fairmq/Parts.h>
34#include <unordered_map>
37#include <fairmq/FwdDecls.h>
42template <
typename T,
typename =
void>
47template <
typename T,
typename =
void>
67 : mProxy{
proxy}, mDispatchControl{dispatcher}
73 mDispatchControl = dispatcher;
84 mParts.AddPart(std::move(headerMsg));
85 mParts.AddPart(std::move(payloadMsg));
91 mParts.AddPart(std::move(headerMsg));
101 fair::mq::Parts parts = std::move(
mParts);
102 assert(parts.Size() == 2);
103 auto*
header = o2::header::get<o2::header::DataHeader*>(parts.At(0)->GetData());
105 throw std::logic_error(
"No valid header message found");
122 return mParts.Size() == 0;
131 return o2::header::get<o2::header::DataHeader*>(
mParts.At(0)->GetData());
139 return o2::header::get<o2::framework::DataProcessingHeader*>(
mParts.At(0)->GetData());
148 return o2::header::get<o2::header::DataHeader*>(
mParts.At(0)->GetData()) ?
reinterpret_cast<o2::header::Stack*
>(
mParts.At(0)->GetData()) :
nullptr;
163 template <
typename ContextType>
164 TrivialObject(ContextType* context, fair::mq::MessagePtr&& headerMsg, fair::mq::MessagePtr&& payloadMsg,
RouteIndex routeIndex)
169 template <
typename ContextType,
typename... Args>
178 assert(
mParts.Size() == 2);
179 return mParts[1].GetData();
194 : mUpstream(
other.mUpstream)
200 return mUpstream !=
nullptr;
204 return mUpstream->getMessage(p);
209 return mUpstream->setMessage(std::move(fmm));
214 return mUpstream->getTransportFactory();
219 return mUpstream->getNumberOfMessages();
225 return mUpstream->allocate(bytes, alignment < 64 ? 64 : alignment);
230 return mUpstream->deallocate(p, bytes, alignment < 64 ? 64 : alignment);
235 return this == &
other;
239 fair::mq::MemoryResource* mUpstream =
nullptr;
247 template <
typename T>
254 static_assert(std::is_base_of<o2::pmr::polymorphic_allocator<value_type>,
typename T::allocator_type>
::value,
"container must have polymorphic allocator");
258 template <
typename ContextType,
typename... Args>
262 mFactory{context->
proxy().getOutputTransport(routeIndex)},
266 mData{
std::forward<Args>(args)..., pmr::polymorphic_allocator<
value_type>(&mResource)}
270 if (mFactory ==
nullptr) {
271 throw runtime_error_f(
"failed to get transport factory for route %d", routeIndex);
273 if (mResource.
isValid() ==
false) {
274 throw runtime_error_f(
"no memory resource for channel %d", routeIndex);
283 assert(
mParts.Size() == 1);
285 mParts.AddPart(std::move(payloadMsg));
308 fair::mq::TransportFactory* mFactory =
nullptr;
316 template <
typename T,
typename _BASE = ContainerRefObject<std::vector<T, o2::pmr::polymorphic_allocator<T>>>>
320 template <
typename... Args>
329 template <
typename T>
338 template <
typename ContextType>
344 auto payloadMsg = context->createMessage(routeIndex,
index, nElements *
sizeof(T));
345 mValue =
value_type(
reinterpret_cast<T*
>(payloadMsg->GetData()), nElements);
346 assert(
mParts.Size() == 1);
347 mParts.AddPart(std::move(payloadMsg));
365 using Messages = std::vector<std::unique_ptr<ContextObject>>;
372 template <
typename T,
typename BASE = std::default_delete<T>>
376 using base = std::default_delete<T>;
395 throw runtime_error(
"No context available to schedule the context object");
396 return base::operator()(
ptr);
406 template <
typename T>
414 template <
typename T,
typename... Args>
417 mMessages.push_back(std::move(make<T>(std::forward<Args>(args)...)));
419 return *
dynamic_cast<T*
>(mMessages.back().get());
426 template <
typename T,
typename... Args>
427 Messages::value_type
make(Args&&... args)
429 static_assert(std::is_base_of<ContextObject, T>::value ==
true,
"type must inherit ContextObject interface");
430 return std::make_unique<T>(
this, std::forward<Args>(args)...);
437 template <
typename T,
typename... Args>
452 for (
auto&
message : mScheduledMessages) {
453 mMessages.emplace_back(std::move(
message));
455 mScheduledMessages.clear();
456 return std::move(mMessages);
461 return mMessages.size();
478 [[nodiscard]] std::unique_ptr<fair::mq::Message>
cloneFromCache(int64_t
id)
const;
502 bool mDidDispatch =
false;
505 std::unordered_map<int64_t, std::unique_ptr<fair::mq::Message>> mMessageCache;
AlignedMemoryResource(fair::mq::MemoryResource *other)
bool do_is_equal(const pmr::memory_resource &other) const noexcept override
void * do_allocate(size_t bytes, size_t alignment) override
AlignedMemoryResource(AlignedMemoryResource const &other)
void do_deallocate(void *p, size_t bytes, size_t alignment) override
void * setMessage(fair::mq::MessagePtr fmm) override
fair::mq::MessagePtr getMessage(void *p) override
size_t getNumberOfMessages() const noexcept override
fair::mq::TransportFactory * getTransportFactory() noexcept override
ContainerRefObject()=delete
default contructor forbidden, object always has to control message instances
fair::mq::Parts finalize() final
Finalize object and return parts by move This retrieves the actual message from the vector object and...
ContainerRefObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, Args &&... args)
constructor taking header message by move and creating the paypload message
return_type & get()
return reference to the handled vector object
value_type * data()
return data pointer of the handled vector object
typename T::value_type value_type
~ContainerRefObject() override=default
virtual ~ContextObject()=default
ContextObject(fair::mq::MessagePtr &&headerMsg, fair::mq::MessagePtr &&payloadMsg, RouteIndex routeIndex)
o2::header::DataHeader const * header()
o2::framework::DataProcessingHeader const * dataProcessingHeader()
ContextObject(fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex)
virtual fair::mq::Parts finalize()
Finalize the object and return the parts by move This is the default method and can be overloaded by ...
o2::header::Stack const * headerStack()
RouteIndex route() const
return the channel name
std::default_delete< T > base
ScopeHook(MessageContext *context)
self_type & operator=(const self_type &)=delete
void operator()(T *ptr) const
~SpanObject() override=default
gsl::span< T > value_type
SpanObject()=delete
default constructor forbidden, object alwasy has to control messages
SpanObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, size_t nElements)
constructor taking header message by move and creating the payload message for the span
TrivialObject handles a message object.
~TrivialObject() override=default
TrivialObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, Args... args)
constructor taking header message by move and creating the paypload message
TrivialObject()=delete
default contructor forbidden, object always has to control message instances
TrivialObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, fair::mq::MessagePtr &&payloadMsg, RouteIndex routeIndex)
constructor consuming the header and payload messages for a given channel by move
VectorObject(Args &&... args)
void init(DispatchControl &&dispatcher)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
Messages::value_type make(Args &&... args)
std::pair< o2::header::DataHeader *, o2::framework::DataProcessingHeader * > findMessageHeaders(const Output &spec)
static constexpr int DefaultChannelIndex
auto & add(Args &&... args)
std::unique_ptr< T, ScopeHook< T > > ContextObjectScope
void schedule(Messages::value_type &&message)
int countDeviceOutputs(bool excludeDPLOrigin=false) const
int64_t addToCache(std::unique_ptr< fair::mq::Message > &message)
MessageContext(FairMQDeviceProxy &proxy)
o2::header::DataHeader * findMessageHeader(const Output &spec)
return the headers of the 1st (from the end) matching message checking first in mMessages then in mSc...
MessageContext(FairMQDeviceProxy &proxy, DispatchControl &&dispatcher)
ContextObjectScope< T > make_scoped(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, size_t size)
Messages getMessagesForSending()
o2::framework::DataProcessingHeader * findMessageDataProcessingHeader(const Output &spec)
FairMQDeviceProxy & proxy()
void pruneFromCache(int64_t id)
static constexpr ServiceKind service_kind
std::vector< std::unique_ptr< ContextObject > > Messages
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * message
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
Defining DataPointCompositeObject explicitly as copiable.
Control for the message dispatching within message context. Depending on dispatching policy,...
VectorOfTObjectPtrs other