Project
Loading...
Searching...
No Matches
MessageContext.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_MESSAGECONTEXT_H_
12#define O2_FRAMEWORK_MESSAGECONTEXT_H_
13
22
23#include "Headers/DataHeader.h"
24#include "Headers/Stack.h"
26
27#include <fairmq/Message.h>
28#include <fairmq/Parts.h>
29
30#include <cassert>
31#include <functional>
32#include <string>
33#include <type_traits>
34#include <unordered_map>
35#include <vector>
36
37#include <fairmq/FwdDecls.h>
38
39namespace o2::framework
40{
41
42template <typename T, typename = void>
43struct enable_root_serialization : std::false_type {
44 using debug_type = T;
45};
46
47template <typename T, typename = void>
48struct root_serializer : std::false_type {
49};
50
51struct Output;
52
54{
55 public:
57
58 // so far we are only using one instance per named channel
59 static constexpr int DefaultChannelIndex = 0;
60
62 : mProxy{proxy}
63 {
64 }
65
67 : mProxy{proxy}, mDispatchControl{dispatcher}
68 {
69 }
70
71 void init(DispatchControl&& dispatcher)
72 {
73 mDispatchControl = dispatcher;
74 }
75
76 // this is the virtual interface for context objects
78 {
79 public:
80 ContextObject() = delete;
81 ContextObject(fair::mq::MessagePtr&& headerMsg, fair::mq::MessagePtr&& payloadMsg, RouteIndex routeIndex)
82 : mParts{}, mRouteIndex{routeIndex}
83 {
84 mParts.AddPart(std::move(headerMsg));
85 mParts.AddPart(std::move(payloadMsg));
86 }
87
88 ContextObject(fair::mq::MessagePtr&& headerMsg, RouteIndex routeIndex)
89 : mParts{}, mRouteIndex{routeIndex}
90 {
91 mParts.AddPart(std::move(headerMsg));
92 }
93
94 virtual ~ContextObject() = default;
95
99 virtual fair::mq::Parts finalize()
100 {
101 fair::mq::Parts parts = std::move(mParts);
102 assert(parts.Size() == 2);
103 auto* header = o2::header::get<o2::header::DataHeader*>(parts.At(0)->GetData());
104 if (header == nullptr) {
105 throw std::logic_error("No valid header message found");
106 } else {
107 // o2::header::get returns const pointer, but here we can change the message
108 const_cast<o2::header::DataHeader*>(header)->payloadSize = parts.At(1)->GetSize();
109 }
110 // return value optimization returns by move
111 return parts;
112 }
113
115 [[nodiscard]] RouteIndex route() const
116 {
117 return mRouteIndex;
118 }
119
120 [[nodiscard]] bool empty() const
121 {
122 return mParts.Size() == 0;
123 }
124
126 {
127 // we would expect this function to be const but the fair::mq::Parts API does not allow this
128 if (empty() || mParts.At(0) == nullptr) {
129 return nullptr;
130 }
131 return o2::header::get<o2::header::DataHeader*>(mParts.At(0)->GetData());
132 }
133
135 {
136 if (empty() || mParts.At(0) == nullptr) {
137 return nullptr;
138 }
139 return o2::header::get<o2::framework::DataProcessingHeader*>(mParts.At(0)->GetData());
140 }
141
143 {
144 // we would expect this function to be const but the fair::mq::Parts API does not allow this
145 if (empty() || mParts.At(0) == nullptr) {
146 return nullptr;
147 }
148 return o2::header::get<o2::header::DataHeader*>(mParts.At(0)->GetData()) ? reinterpret_cast<o2::header::Stack*>(mParts.At(0)->GetData()) : nullptr;
149 }
150
151 protected:
152 fair::mq::Parts mParts;
154 };
155
158 {
159 public:
161 TrivialObject() = delete;
163 template <typename ContextType>
164 TrivialObject(ContextType* context, fair::mq::MessagePtr&& headerMsg, fair::mq::MessagePtr&& payloadMsg, RouteIndex routeIndex)
165 : ContextObject(std::forward<fair::mq::MessagePtr>(headerMsg), std::forward<fair::mq::MessagePtr>(payloadMsg), routeIndex)
166 {
167 }
169 template <typename ContextType, typename... Args>
170 TrivialObject(ContextType* context, fair::mq::MessagePtr&& headerMsg, RouteIndex routeIndex, int index, Args... args)
171 : ContextObject(std::forward<fair::mq::MessagePtr>(headerMsg), context->createMessage(routeIndex, index, std::forward<Args>(args)...), routeIndex)
172 {
173 }
174 ~TrivialObject() override = default;
175
176 auto* data()
177 {
178 assert(mParts.Size() == 2);
179 return mParts[1].GetData();
180 }
181 };
182
183 // A memory resource which can force a minimum alignment, so that
184 // the whole polymorphic allocator business is happy...
186 {
187 public:
188 AlignedMemoryResource(fair::mq::MemoryResource* other)
189 : mUpstream(other)
190 {
191 }
192
194 : mUpstream(other.mUpstream)
195 {
196 }
197
198 bool isValid()
199 {
200 return mUpstream != nullptr;
201 }
202 fair::mq::MessagePtr getMessage(void* p) override
203 {
204 return mUpstream->getMessage(p);
205 }
206
207 void* setMessage(fair::mq::MessagePtr fmm) override
208 {
209 return mUpstream->setMessage(std::move(fmm));
210 }
211
212 fair::mq::TransportFactory* getTransportFactory() noexcept override
213 {
214 return mUpstream->getTransportFactory();
215 }
216
217 [[nodiscard]] size_t getNumberOfMessages() const noexcept override
218 {
219 return mUpstream->getNumberOfMessages();
220 }
221
222 protected:
223 void* do_allocate(size_t bytes, size_t alignment) override
224 {
225 return mUpstream->allocate(bytes, alignment < 64 ? 64 : alignment);
226 }
227
228 void do_deallocate(void* p, size_t bytes, size_t alignment) override
229 {
230 return mUpstream->deallocate(p, bytes, alignment < 64 ? 64 : alignment);
231 }
232
233 [[nodiscard]] bool do_is_equal(const pmr::memory_resource& other) const noexcept override
234 {
235 return this == &other;
236 }
237
238 private:
239 fair::mq::MemoryResource* mUpstream = nullptr;
240 };
241
247 template <typename T>
249 {
250 public:
251 using value_type = typename T::value_type;
252 using return_type = T;
254 static_assert(std::is_base_of<o2::pmr::polymorphic_allocator<value_type>, typename T::allocator_type>::value, "container must have polymorphic allocator");
258 template <typename ContextType, typename... Args>
259 ContainerRefObject(ContextType* context, fair::mq::MessagePtr&& headerMsg, RouteIndex routeIndex, int index, Args&&... args)
260 : ContextObject(std::forward<fair::mq::MessagePtr>(headerMsg), routeIndex),
261 // the transport factory
262 mFactory{context->proxy().getOutputTransport(routeIndex)},
263 // the memory resource takes ownership of the message
264 mResource{mFactory ? AlignedMemoryResource(mFactory->GetMemoryResource()) : AlignedMemoryResource(nullptr)},
265 // create the vector with apropriate underlying memory resource for the message
266 mData{std::forward<Args>(args)..., pmr::polymorphic_allocator<value_type>(&mResource)}
267 {
268 // FIXME: drop this repeated check and make sure at initial setup of devices that everything is fine
269 // introduce error policy
270 if (mFactory == nullptr) {
271 throw runtime_error_f("failed to get transport factory for route %d", routeIndex);
272 }
273 if (mResource.isValid() == false) {
274 throw runtime_error_f("no memory resource for channel %d", routeIndex);
275 }
276 }
277 ~ContainerRefObject() override = default;
278
281 fair::mq::Parts finalize() final
282 {
283 assert(mParts.Size() == 1);
284 auto payloadMsg = o2::pmr::getMessage(std::move(mData));
285 mParts.AddPart(std::move(payloadMsg));
287 }
288
290 operator return_type&()
291 {
292 return mData;
293 }
294
297 {
298 return mData;
299 }
300
303 {
304 return mData.data();
305 }
306
307 private:
308 fair::mq::TransportFactory* mFactory = nullptr;
309 AlignedMemoryResource mResource;
310 buffer_type mData;
311 };
312
316 template <typename T, typename _BASE = ContainerRefObject<std::vector<T, o2::pmr::polymorphic_allocator<T>>>>
317 class VectorObject : public _BASE
318 {
319 public:
320 template <typename... Args>
321 VectorObject(Args&&... args) : _BASE(std::forward<Args>(args)...)
322 {
323 }
324 };
325
326 // SpanObject creates a trivial binary object for an array of elements of
327 // type T and holds a span over the elements
328 // FIXME: probably obsolete after introducing of vector with polymorphic_allocator
329 template <typename T>
331 {
332 public:
333 static_assert(is_messageable<T>::value == true, "unconsistent type");
334 using value_type = gsl::span<T>;
336 SpanObject() = delete;
338 template <typename ContextType>
339 SpanObject(ContextType* context, fair::mq::MessagePtr&& headerMsg, RouteIndex routeIndex, int index, size_t nElements)
340 : ContextObject(std::forward<fair::mq::MessagePtr>(headerMsg), routeIndex)
341 {
342 // create the span object for the memory of the payload message
343 // TODO: we probably also want to check consistency of the header message, i.e. payloadSize member
344 auto payloadMsg = context->createMessage(routeIndex, index, nElements * sizeof(T));
345 mValue = value_type(reinterpret_cast<T*>(payloadMsg->GetData()), nElements);
346 assert(mParts.Size() == 1);
347 mParts.AddPart(std::move(payloadMsg));
348 }
349 ~SpanObject() override = default;
350
351 operator value_type&()
352 {
353 return mValue;
354 }
355
357 {
358 return mValue;
359 }
360
361 private:
362 value_type mValue;
363 };
364
365 using Messages = std::vector<std::unique_ptr<ContextObject>>;
366
372 template <typename T, typename BASE = std::default_delete<T>>
373 class ScopeHook : public BASE
374 {
375 public:
376 using base = std::default_delete<T>;
378 ScopeHook() = default;
380 : mContext(context)
381 {
382 }
383 ~ScopeHook() = default;
384
385 // forbid assignment operator to prohibid changing the Deleter
386 // resource control property once used in the unique_ptr
387 self_type& operator=(const self_type&) = delete;
388
389 void operator()(T* ptr) const
390 {
391 if (!mContext) {
392 // TODO: decide whether this is an error or not
393 // can also check if the standard constructor can be dropped to make sure that
394 // the ScopeHook is always set up with a context
395 throw runtime_error("No context available to schedule the context object");
396 return base::operator()(ptr);
397 }
398 // keep the object alive and add to message list of the context
399 mContext->schedule(Messages::value_type(ptr));
400 }
401
402 private:
403 MessageContext* mContext = nullptr;
404 };
405
406 template <typename T>
407 using ContextObjectScope = std::unique_ptr<T, ScopeHook<T>>;
408
414 template <typename T, typename... Args>
415 auto& add(Args&&... args)
416 {
417 mMessages.push_back(std::move(make<T>(std::forward<Args>(args)...)));
418 // return a reference to the element in the vector of unique pointers
419 return *dynamic_cast<T*>(mMessages.back().get());
420 }
421
426 template <typename T, typename... Args>
427 Messages::value_type make(Args&&... args)
428 {
429 static_assert(std::is_base_of<ContextObject, T>::value == true, "type must inherit ContextObject interface");
430 return std::make_unique<T>(this, std::forward<Args>(args)...);
431 }
432
437 template <typename T, typename... Args>
439 {
440 ContextObjectScope<T> scope(dynamic_cast<T*>(make<T>(std::forward<Args>(args)...).release()), ScopeHook<T>(this));
441 return scope;
442 }
443
447 void schedule(Messages::value_type&& message);
448
450 {
451 // before starting iteration, message lists are merged
452 for (auto& message : mScheduledMessages) {
453 mMessages.emplace_back(std::move(message));
454 }
455 mScheduledMessages.clear();
456 return std::move(mMessages);
457 }
458
459 size_t size()
460 {
461 return mMessages.size();
462 }
463
467 void clear();
468
470 {
471 return mProxy;
472 }
473
474 // Add a message to cache and returns a unique identifier for
475 // such cached message.
476 int64_t addToCache(std::unique_ptr<fair::mq::Message>& message);
477 // Clone a message from cache so that it can be added to the context
478 [[nodiscard]] std::unique_ptr<fair::mq::Message> cloneFromCache(int64_t id) const;
479 // Prune a message from cache
480 void pruneFromCache(int64_t id);
481
485 // FIXME: can that be const?
486 fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, size_t size);
487 fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, void* data, size_t size, fair::mq::FreeFn* ffn, void* hint);
488
492 [[nodiscard]] int countDeviceOutputs(bool excludeDPLOrigin = false) const;
493 void fakeDispatch() { mDidDispatch = true; }
494 bool didDispatch() { return mDidDispatch; }
496 std::pair<o2::header::DataHeader*, o2::framework::DataProcessingHeader*> findMessageHeaders(const Output& spec);
497
498 private:
499 FairMQDeviceProxy& mProxy;
500 Messages mMessages;
501 Messages mScheduledMessages;
502 bool mDidDispatch = false;
503 DispatchControl mDispatchControl;
505 std::unordered_map<int64_t, std::unique_ptr<fair::mq::Message>> mMessageCache;
506};
507} // namespace o2::framework
508#endif // O2_FRAMEWORK_MESSAGECONTEXT_H_
TBranch * ptr
AlignedMemoryResource(fair::mq::MemoryResource *other)
bool do_is_equal(const pmr::memory_resource &other) const noexcept override
void * do_allocate(size_t bytes, size_t alignment) override
AlignedMemoryResource(AlignedMemoryResource const &other)
void do_deallocate(void *p, size_t bytes, size_t alignment) override
void * setMessage(fair::mq::MessagePtr fmm) override
fair::mq::MessagePtr getMessage(void *p) override
size_t getNumberOfMessages() const noexcept override
fair::mq::TransportFactory * getTransportFactory() noexcept override
ContainerRefObject()=delete
default contructor forbidden, object always has to control message instances
fair::mq::Parts finalize() final
Finalize object and return parts by move This retrieves the actual message from the vector object and...
ContainerRefObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, Args &&... args)
constructor taking header message by move and creating the paypload message
return_type & get()
return reference to the handled vector object
value_type * data()
return data pointer of the handled vector object
ContextObject(fair::mq::MessagePtr &&headerMsg, fair::mq::MessagePtr &&payloadMsg, RouteIndex routeIndex)
o2::header::DataHeader const * header()
o2::framework::DataProcessingHeader const * dataProcessingHeader()
ContextObject(fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex)
virtual fair::mq::Parts finalize()
Finalize the object and return the parts by move This is the default method and can be overloaded by ...
RouteIndex route() const
return the channel name
self_type & operator=(const self_type &)=delete
SpanObject()=delete
default constructor forbidden, object alwasy has to control messages
SpanObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, size_t nElements)
constructor taking header message by move and creating the payload message for the span
TrivialObject handles a message object.
TrivialObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, RouteIndex routeIndex, int index, Args... args)
constructor taking header message by move and creating the paypload message
TrivialObject()=delete
default contructor forbidden, object always has to control message instances
TrivialObject(ContextType *context, fair::mq::MessagePtr &&headerMsg, fair::mq::MessagePtr &&payloadMsg, RouteIndex routeIndex)
constructor consuming the header and payload messages for a given channel by move
void init(DispatchControl &&dispatcher)
o2::header::Stack * findMessageHeaderStack(const Output &spec)
Messages::value_type make(Args &&... args)
std::pair< o2::header::DataHeader *, o2::framework::DataProcessingHeader * > findMessageHeaders(const Output &spec)
static constexpr int DefaultChannelIndex
auto & add(Args &&... args)
std::unique_ptr< T, ScopeHook< T > > ContextObjectScope
void schedule(Messages::value_type &&message)
int countDeviceOutputs(bool excludeDPLOrigin=false) const
int64_t addToCache(std::unique_ptr< fair::mq::Message > &message)
MessageContext(FairMQDeviceProxy &proxy)
o2::header::DataHeader * findMessageHeader(const Output &spec)
return the headers of the 1st (from the end) matching message checking first in mMessages then in mSc...
MessageContext(FairMQDeviceProxy &proxy, DispatchControl &&dispatcher)
ContextObjectScope< T > make_scoped(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, size_t size)
o2::framework::DataProcessingHeader * findMessageDataProcessingHeader(const Output &spec)
FairMQDeviceProxy & proxy()
static constexpr ServiceKind service_kind
std::vector< std::unique_ptr< ContextObject > > Messages
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
Defining DataPointCompositeObject explicitly as copiable.
Control for the message dispatching within message context. Depending on dispatching policy,...
the main header struct
Definition DataHeader.h:618
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
uint32_t buffer_type
VectorOfTObjectPtrs other