Project
Loading...
Searching...
No Matches
FairMQDeviceProxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "InputRouteHelpers.h"
16#include "Headers/DataHeader.h"
18
19#include <fairmq/Channel.h>
20#include <fairmq/Device.h>
21#include <fairmq/Message.h>
22#include <fairmq/TransportFactory.h>
23
24#include <unordered_set>
25
26namespace o2::framework
27{
28
30{
31 assert(mOutputRoutes.size());
32 assert(index.value < mOutputRoutes.size());
33 assert(mOutputRoutes[index.value].channel.value != -1);
34 assert(mOutputChannelInfos.size());
35 assert(mOutputRoutes[index.value].channel.value < mOutputChannelInfos.size());
36 return mOutputRoutes[index.value].channel;
37}
38
40{
41 assert(mInputRoutes.size());
42 assert(index.value < mInputRoutes.size());
43 assert(mInputRoutes[index.value].channel.value != -1);
44 assert(mInputChannels.size());
45 assert(mInputRoutes[index.value].channel.value < mInputChannels.size());
46 return mInputRoutes[index.value].channel;
47}
48
50{
51 assert(mForwardRoutes.size());
52 assert(index.value < mForwardRoutes.size());
53 assert(mForwardRoutes[index.value].channel.value != -1);
54 assert(mForwardChannelInfos.size());
55 assert(mForwardRoutes[index.value].channel.value < mForwardChannelInfos.size());
56 return mForwardRoutes[index.value].channel;
57}
58
60{
61 assert(mOutputChannelInfos.size());
62 assert(index.value < mOutputChannelInfos.size());
63 return &mOutputChannelInfos[index.value].channel;
64}
65
67{
68 assert(mOutputChannelInfos.size());
69 assert(index.value < mOutputChannelInfos.size());
70 return mOutputChannelInfos[index.value];
71}
72
74{
75 assert(mOutputChannelInfos.size());
76 assert(index.value < mOutputChannelInfos.size());
77 return mOutputChannelStates[index.value];
78}
79
81{
82 assert(mInputChannels.size());
83 assert(index.value < mInputChannels.size());
84 return mInputChannels[index.value];
85}
86
88{
89 assert(mForwardChannelInfos.size());
90 assert(index.value < mForwardChannelInfos.size());
91 return &mForwardChannelInfos[index.value].channel;
92}
93
95{
96 assert(mForwardChannelInfos.size());
97 assert(index.value < mForwardChannelInfos.size());
98 return mForwardChannelInfos[index.value];
99}
100
102{
103 assert(mForwardChannelInfos.size());
104 assert(index.value < mForwardChannelStates.size());
105 return mForwardChannelStates[index.value];
106}
107
109{
110 assert(mOutputRoutes.size() == mOutputs.size());
111 for (size_t ri = 0; ri < mOutputs.size(); ++ri) {
112 auto& route = mOutputs[ri];
113
114 LOG(debug) << "matching: " << DataSpecUtils::describe(query) << " to route " << DataSpecUtils::describe(route.matcher);
115 if (DataSpecUtils::match(route.matcher, query) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
116 return mOutputRoutes[ri].channel;
117 }
118 }
120}
121
122void FairMQDeviceProxy::getMatchingForwardChannelIndexes(std::vector<ChannelIndex>& result, header::DataHeader const& dh, size_t timeslice) const
123{
124 assert(mForwardRoutes.size() == mForwards.size());
125 // Notice we need to match against a data header and not against
126 // the InputMatcher, because an input might match something which
127 // is then rerouted to two different output routes, depending on the content.
128 // Also notice that we need to match against all the routes, because we
129 // might have multiple outputs routes (e.g. in the output proxy) with the same matcher.
130 bool dplChannelMatched = false;
131 for (size_t ri = 0; ri < mForwards.size(); ++ri) {
132 auto& route = mForwards[ri];
133
134 LOGP(debug, "matching: {} to route {}", dh, DataSpecUtils::describe(route.matcher));
135 if (DataSpecUtils::match(route.matcher, dh.dataOrigin, dh.dataDescription, dh.subSpecification) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
136 auto channelInfoIndex = mForwardRoutes[ri].channel;
137 auto& info = mForwardChannelInfos[channelInfoIndex.value];
138 // We need to make sure that we forward the same payload only once per channel.
139 if (info.channelType == ChannelAccountingType::DPL) {
140 if (dplChannelMatched) {
141 continue;
142 }
143 dplChannelMatched = true;
144 }
145 result.emplace_back(channelInfoIndex);
146 }
147 }
148 // Remove duplicates, keeping the order of the channels.
149 std::unordered_set<int> numSet;
150 auto iter = std::stable_partition(result.begin(), result.end(),
151 [&](ChannelIndex n) { bool ret = !numSet.count(n.value); numSet.insert(n.value); return ret; }); // returns true if the item has not been "seen"
152 result.erase(iter, result.end());
153}
154
156{
157 for (int i = 0; i < mOutputChannelInfos.size(); i++) {
158 if (mOutputChannelInfos[i].name == name) {
159 return {i};
160 }
161 }
162 return {ChannelIndex::INVALID};
163}
164
166{
167 for (int i = 0; i < mInputChannelNames.size(); i++) {
168 if (mInputChannelNames[i] == name) {
169 return {i};
170 }
171 }
172 return {ChannelIndex::INVALID};
173}
174
176{
177 for (int i = 0; i < mForwardChannelInfos.size(); i++) {
178 if (mForwardChannelInfos[i].name == name) {
179 return {i};
180 }
181 }
182 return {ChannelIndex::INVALID};
183}
184
186{
187 auto transport = getOutputChannel(getOutputChannelIndex(index))->Transport();
188 assert(transport);
189 return transport;
190}
191
192fair::mq::TransportFactory* FairMQDeviceProxy::getInputTransport(RouteIndex index) const
193{
194 auto transport = getInputChannel(getInputChannelIndex(index))->Transport();
195 assert(transport);
196 return transport;
197}
198
200{
201 auto transport = getInputChannel(getInputChannelIndex(index))->Transport();
202 assert(transport);
203 return transport;
204}
205
206std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createOutputMessage(RouteIndex routeIndex) const
207{
208 return getOutputTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
209}
210
211std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createOutputMessage(RouteIndex routeIndex, const size_t size) const
212{
213 return getOutputTransport(routeIndex)->CreateMessage(size, fair::mq::Alignment{64});
214}
215
216std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createInputMessage(RouteIndex routeIndex) const
217{
218 return getInputTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
219}
220
221std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createInputMessage(RouteIndex routeIndex, const size_t size) const
222{
223 return getInputTransport(routeIndex)->CreateMessage(size, fair::mq::Alignment{64});
224}
225
226std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createForwardMessage(RouteIndex routeIndex) const
227{
228 return getForwardTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
229}
230
231void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
232 std::vector<ForwardRoute> const& forwards,
233 std::function<fair::mq::Channel&(std::string const&)> bindChannelByName,
234 std::function<bool(void)> newStatePending)
235{
236 mOutputs.clear();
237 mOutputRoutes.clear();
238 mOutputChannelInfos.clear();
239 mOutputChannelStates.clear();
240 mInputs.clear();
241 mInputRoutes.clear();
242 mInputChannels.clear();
243 mInputChannelNames.clear();
244 mForwards.clear();
245 mForwardRoutes.clear();
246 mForwardChannelInfos.clear();
247 mForwardChannelStates.clear();
248 {
249 mOutputs = outputs;
250 mOutputRoutes.reserve(outputs.size());
251 size_t ri = 0;
252 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
253 for (auto& route : outputs) {
254 // If the channel is not yet registered, register it.
255 // If the channel is already registered, use the existing index.
256 auto channelPos = channelNameToChannel.find(route.channel);
257 ChannelIndex channelIndex;
258
259 if (channelPos == channelNameToChannel.end()) {
260 channelIndex = ChannelIndex{(int)mOutputChannelInfos.size()};
261 ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
262 auto& channel = bindChannelByName(route.channel);
264 .name = route.channel,
265 .channelType = dplChannel,
266 .channel = channel,
267 .policy = route.policy,
268 .index = channelIndex,
269 };
270 mOutputChannelInfos.push_back(info);
271 mOutputChannelStates.push_back({0});
272 channelNameToChannel[route.channel] = channelIndex;
273 LOGP(detail, "Binding channel {} to channel index {}", route.channel, channelIndex.value);
274 } else {
275 LOGP(detail, "Using index {} for channel {}", channelPos->second.value, route.channel);
276 channelIndex = channelPos->second;
277 }
278 LOGP(detail, "Binding route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.value);
279 mOutputRoutes.emplace_back(RouteState{channelIndex, false});
280 ri++;
281 }
282#ifndef NDEBUG
283 for (auto& route : mOutputRoutes) {
284 assert(route.channel.value != -1);
285 assert(route.channel.value < mOutputChannelInfos.size());
286 }
287#endif
288 LOGP(detail, "Total channels found {}, total routes {}", mOutputChannelInfos.size(), mOutputRoutes.size());
289 assert(mOutputRoutes.size() == outputs.size());
290 }
291
292 {
293 auto maxLanes = InputRouteHelpers::maxLanes(inputs);
294 mInputs = inputs;
295 mInputRoutes.reserve(inputs.size());
296 size_t ri = 0;
297 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
298 for (auto& route : inputs) {
299 // If the channel is not yet registered, register it.
300 // If the channel is already registered, use the existing index.
301 auto channelPos = channelNameToChannel.find(route.sourceChannel);
302 ChannelIndex channelIndex;
303
304 if (channelPos == channelNameToChannel.end()) {
305 channelIndex = ChannelIndex{(int)mInputChannels.size()};
306 fair::mq::Channel& channel = bindChannelByName(route.sourceChannel);
307
308 mInputChannels.push_back(&channel);
309 mInputChannelNames.push_back(route.sourceChannel);
310 channelNameToChannel[route.sourceChannel] = channelIndex;
311 LOGP(detail, "Binding channel {} to channel index {}", route.sourceChannel, channelIndex.value);
312 } else {
313 LOGP(detail, "Using index {} for channel {}", channelPos->second.value, route.sourceChannel);
314 channelIndex = channelPos->second;
315 }
316 LOGP(detail, "Binding route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, maxLanes, ri, channelIndex.value);
317 mInputRoutes.emplace_back(RouteState{channelIndex, false});
318 ri++;
319 }
320 assert(std::all_of(mInputRoutes.begin(), mInputRoutes.end(), [s = mInputChannels.size()](RouteState const& route) { return route.channel.value != -1 && route.channel.value < s; }));
321 LOGP(detail, "Total input channels found {}, total routes {}", mInputChannels.size(), mInputRoutes.size());
322 assert(mInputRoutes.size() == inputs.size());
323 }
324
325 {
326 mForwards = forwards;
327 mForwardRoutes.reserve(forwards.size());
328 LOGP(detail, "Forwards.size(): {}", forwards.size());
329 size_t ri = 0;
330 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
331
332 for (auto& route : forwards) {
333 // If the channel is not yet registered, register it.
334 // If the channel is already registered, use the existing index.
335 auto channelPos = channelNameToChannel.find(route.channel);
336 ChannelIndex channelIndex;
337
338 if (channelPos == channelNameToChannel.end()) {
339 channelIndex = ChannelIndex{(int)mForwardChannelInfos.size()};
340 auto& channel = bindChannelByName(route.channel);
341
342 ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
343 mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel, .policy = route.policy, .index = channelIndex});
344 mForwardChannelStates.push_back(ForwardChannelState{0});
345 channelNameToChannel[route.channel] = channelIndex;
346 LOGP(detail, "Binding forward channel {} to channel index {}", route.channel, channelIndex.value);
347 } else {
348 LOGP(detail, "Using index {} for forward channel {}", channelPos->second.value, route.channel);
349 channelIndex = channelPos->second;
350 }
351 LOGP(detail, "Binding forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.value);
352 mForwardRoutes.emplace_back(RouteState{channelIndex, false});
353 ri++;
354 }
355 LOGP(detail, "Total forward channels found {}, total routes {}", mForwardChannelInfos.size(), mForwardRoutes.size());
356 assert(mForwardRoutes.size() == forwards.size());
357 for (size_t fi = 0; fi < mForwards.size(); fi++) {
358 auto& route = mForwards[fi];
359 auto& state = mForwardRoutes[fi];
360 assert(state.channel.value != -1);
361 assert(state.channel.value < mForwardChannelInfos.size());
362 LOGP(detail, "Forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi, state.channel.value);
363 }
364 }
365 mStateChangeCallback = newStatePending;
366}
367} // namespace o2::framework
benchmark::State & state
std::ostringstream debug
int32_t i
std::unique_ptr< fair::mq::Message > createOutputMessage(RouteIndex routeIndex) const
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
ForwardChannelState & getForwardChannelState(ChannelIndex channelIndex)
Retrieve information associated to a given forward by ChannelIndex.
fair::mq::TransportFactory * getForwardTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
OutputChannelState & getOutputChannelState(ChannelIndex channelIndex)
Retrieve information associated to a given forward by ChannelIndex.
ChannelIndex getInputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
ChannelIndex getForwardChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
ChannelIndex getForwardChannelIndex(RouteIndex routeIndex) const
void getMatchingForwardChannelIndexes(std::vector< ChannelIndex > &result, header::DataHeader const &header, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
ForwardChannelInfo const & getForwardChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
ChannelIndex getOutputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
fair::mq::Channel * getForwardChannel(ChannelIndex channelIndex) const
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
ChannelIndex getOutputChannelIndex(OutputSpec const &spec, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
fair::mq::Channel * getOutputChannel(ChannelIndex channelIndex) const
std::unique_ptr< fair::mq::Message > createForwardMessage(RouteIndex routeIndex) const
fair::mq::Channel * getInputChannel(ChannelIndex channelIndex) const
Retrieve the channel associated to a given output route.
ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
fair::mq::TransportFactory * getInputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
std::unique_ptr< fair::mq::Message > createInputMessage(RouteIndex routeIndex) const
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
@ RAWFMQ
A raw FairMQ channel which is not accounted by the framework.
@ DPL
The channel is a normal input channel.
static constexpr int INVALID
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Forward channel information.
Definition ChannelInfo.h:88
std::string name
The name of the channel.
Definition ChannelInfo.h:90
static size_t maxLanes(std::vector< InputRoute > const &routes)
Output channel information.
Definition ChannelInfo.h:73
Keeps current state of a given route.
Definition RouteState.h:19
the main header struct
Definition DataHeader.h:619
DataDescription dataDescription
Definition DataHeader.h:637
SubSpecificationType subSpecification
Definition DataHeader.h:657
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"