Project
Loading...
Searching...
No Matches
MessageContext.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/Output.h"
15#include <fairmq/Device.h>
16
17namespace o2::framework
18{
19
20fair::mq::MessagePtr MessageContext::createMessage(RouteIndex routeIndex, int index, size_t size)
21{
22 auto* transport = mProxy.getOutputTransport(routeIndex);
23 return transport->CreateMessage(size, fair::mq::Alignment{64});
24}
25
26fair::mq::MessagePtr MessageContext::createMessage(RouteIndex routeIndex, int index, void* data, size_t size, fair::mq::FreeFn* ffn, void* hint)
27{
28 auto* transport = mProxy.getOutputTransport(routeIndex);
29 return transport->CreateMessage(data, size, ffn, hint);
30}
31
33{
34 for (auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
35 const auto* hd = (*it)->header();
36 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
37 return const_cast<o2::header::DataHeader*>(hd); // o2::header::get returns const pointer, but the caller may need non-const
38 }
39 }
40 for (auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
41 const auto* hd = (*it)->header();
42 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
43 return const_cast<o2::header::DataHeader*>(hd); // o2::header::get returns const pointer, but the caller may need non-const
44 }
45 }
46 return nullptr;
47}
48
50{
51 for (auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
52 const auto* hd = (*it)->header();
53 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
54 return const_cast<o2::framework::DataProcessingHeader*>((*it)->dataProcessingHeader());
55 }
56 }
57 for (auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
58 const auto* hd = (*it)->header();
59 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
60 return const_cast<o2::framework::DataProcessingHeader*>((*it)->dataProcessingHeader()); // o2::header::get returns const pointer, but the caller may need non-const
61 }
62 }
63 return nullptr;
64}
65
67{
68 for (auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
69 const auto* hd = (*it)->header();
70 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
71 return const_cast<o2::header::Stack*>((*it)->headerStack());
72 }
73 }
74 for (auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
75 const auto* hd = (*it)->header();
76 if (hd->dataOrigin == spec.origin && hd->dataDescription == spec.description && hd->subSpecification == spec.subSpec) {
77 return const_cast<o2::header::Stack*>((*it)->headerStack());
78 }
79 }
80 return nullptr;
81}
82
83int MessageContext::countDeviceOutputs(bool excludeDPLOrigin) const
84{
85 // If we dispatched some messages before the end of the callback
86 // we need to account for them as well.
87 int noutputs = mDidDispatch ? 1 : 0;
88 constexpr o2::header::DataOrigin DataOriginDPL{"DPL"};
89 for (auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
90 if (!excludeDPLOrigin || (*it)->header()->dataOrigin != DataOriginDPL) {
91 noutputs++;
92 }
93 }
94 for (auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
95 if (!excludeDPLOrigin || (*it)->header()->dataOrigin != DataOriginDPL) {
96 noutputs++;
97 }
98 }
99 return noutputs;
100}
101
103{
104 // Verify that everything has been sent on clear.
105 assert(std::all_of(mMessages.begin(), mMessages.end(), [](auto& m) { return m->empty(); }));
106 mDidDispatch = false;
107 mMessages.clear();
108}
109
110int64_t MessageContext::addToCache(std::unique_ptr<fair::mq::Message>& toCache)
111{
112 auto&& cached = toCache->GetTransport()->CreateMessage();
113 cached->Copy(*toCache);
114 // The pointer is immutable!
115 auto cacheId = (int64_t)toCache->GetData();
116 mMessageCache.insert({cacheId, std::move(cached)});
117 return cacheId;
118}
119
120std::unique_ptr<fair::mq::Message> MessageContext::cloneFromCache(int64_t id) const
121{
122 auto& inCache = mMessageCache.at(id);
123 auto&& cloned = inCache->GetTransport()->CreateMessage();
124 cloned->Copy(*inCache);
125 return std::move(cloned);
126}
127
129{
130 mMessageCache.erase(id);
131}
132
133void MessageContext::schedule(Messages::value_type&& message)
134{
135 auto const* header = message->header();
136 if (header == nullptr) {
137 throw std::logic_error("No valid header message found");
138 }
139 mScheduledMessages.emplace_back(std::move(message));
140 if (mDispatchControl.dispatch != nullptr) {
141 // send all scheduled messages if there is no trigger callback or its result is true
142 if (mDispatchControl.trigger == nullptr || mDispatchControl.trigger(*header)) {
143 std::vector<fair::mq::Parts> outputsPerChannel;
144 outputsPerChannel.resize(mProxy.getNumOutputChannels());
145 for (auto& message : mScheduledMessages) {
146 fair::mq::Parts parts = message->finalize();
147 assert(message->empty());
148 assert(parts.Size() == 2);
149 for (auto& part : parts) {
150 outputsPerChannel[mProxy.getOutputChannelIndex(message->route()).value].AddPart(std::move(part));
151 }
152 }
153 for (int ci = 0; ci < mProxy.getNumOutputChannels(); ++ci) {
154 auto& parts = outputsPerChannel[ci];
155 if (parts.Size() == 0) {
156 continue;
157 }
158 mDispatchControl.dispatch(std::move(parts), ChannelIndex{ci}, DefaultChannelIndex);
159 }
160 mDidDispatch = mScheduledMessages.empty() == false;
161 mScheduledMessages.clear();
162 }
163 }
164}
165
166} // namespace o2::framework
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
ChannelIndex getOutputChannelIndex(OutputSpec const &spec, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
o2::header::Stack * findMessageHeaderStack(const Output &spec)
static constexpr int DefaultChannelIndex
void schedule(Messages::value_type &&message)
int countDeviceOutputs(bool excludeDPLOrigin=false) const
int64_t addToCache(std::unique_ptr< fair::mq::Message > &message)
o2::header::DataHeader * findMessageHeader(const Output &spec)
return the headers of the 1st (from the end) matching message checking first in mMessages then in mSc...
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, size_t size)
o2::framework::DataProcessingHeader * findMessageDataProcessingHeader(const Output &spec)
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
header::DataHeader::SubSpecificationType subSpec
Definition Output.h:30
header::DataDescription description
Definition Output.h:29
header::DataOrigin origin
Definition Output.h:28
the main header struct
Definition DataHeader.h:618
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36