Project
Loading...
Searching...
No Matches
FairMQDeviceProxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "InputRouteHelpers.h"
16#include "Headers/DataHeader.h"
18
19#include <fairmq/Channel.h>
20#include <fairmq/Device.h>
21#include <fairmq/Message.h>
22#include <fairmq/TransportFactory.h>
23
24#include <unordered_set>
25
26namespace o2::framework
27{
28
30{
31 assert(mOutputRoutes.size());
32 assert(index.value < mOutputRoutes.size());
33 assert(mOutputRoutes[index.value].channel.value != -1);
34 assert(mOutputChannelInfos.size());
35 assert(mOutputRoutes[index.value].channel.value < mOutputChannelInfos.size());
36 return mOutputRoutes[index.value].channel;
37}
38
40{
41 assert(mInputRoutes.size());
42 assert(index.value < mInputRoutes.size());
43 assert(mInputRoutes[index.value].channel.value != -1);
44 assert(mInputChannels.size());
45 assert(mInputRoutes[index.value].channel.value < mInputChannels.size());
46 return mInputRoutes[index.value].channel;
47}
48
50{
51 assert(mForwardRoutes.size());
52 assert(index.value < mForwardRoutes.size());
53 assert(mForwardRoutes[index.value].channel.value != -1);
54 assert(mForwardChannelInfos.size());
55 assert(mForwardRoutes[index.value].channel.value < mForwardChannelInfos.size());
56 return mForwardRoutes[index.value].channel;
57}
58
60{
61 assert(mOutputChannelInfos.size());
62 assert(index.value < mOutputChannelInfos.size());
63 return &mOutputChannelInfos[index.value].channel;
64}
65
67{
68 assert(mOutputChannelInfos.size());
69 assert(index.value < mOutputChannelInfos.size());
70 return mOutputChannelInfos[index.value];
71}
72
74{
75 assert(mOutputChannelInfos.size());
76 assert(index.value < mOutputChannelInfos.size());
77 return mOutputChannelStates[index.value];
78}
79
81{
82 assert(mInputChannels.size());
83 assert(index.value < mInputChannels.size());
84 return mInputChannels[index.value];
85}
86
88{
89 assert(mForwardChannelInfos.size());
90 assert(index.value < mForwardChannelInfos.size());
91 return &mForwardChannelInfos[index.value].channel;
92}
93
95{
96 assert(mForwardChannelInfos.size());
97 assert(index.value < mForwardChannelInfos.size());
98 return mForwardChannelInfos[index.value];
99}
100
102{
103 assert(mForwardChannelInfos.size());
104 assert(index.value < mForwardChannelStates.size());
105 return mForwardChannelStates[index.value];
106}
107
109{
110 assert(mOutputRoutes.size() == mOutputs.size());
111 for (size_t ri = 0; ri < mOutputs.size(); ++ri) {
112 auto& route = mOutputs[ri];
113
114 LOG(debug) << "matching: " << DataSpecUtils::describe(query) << " to route " << DataSpecUtils::describe(route.matcher);
115 if (DataSpecUtils::match(route.matcher, query) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
116 return mOutputRoutes[ri].channel;
117 }
118 }
120}
121
122void FairMQDeviceProxy::getMatchingForwardChannelIndexes(std::vector<ChannelIndex>& result, header::DataHeader const& dh, size_t timeslice) const
123{
124 assert(mForwardRoutes.size() == mForwards.size());
125 // Notice we need to match against a data header and not against
126 // the InputMatcher, because an input might match something which
127 // is then rerouted to two different output routes, depending on the content.
128 // Also notice that we need to match against all the routes, because we
129 // might have multiple outputs routes (e.g. in the output proxy) with the same matcher.
130 bool dplChannelMatched = false;
131 for (size_t ri = 0; ri < mForwards.size(); ++ri) {
132 auto& route = mForwards[ri];
133
134 LOGP(debug, "matching: {} to route {}", dh, DataSpecUtils::describe(route.matcher));
135 if (DataSpecUtils::match(route.matcher, dh.dataOrigin, dh.dataDescription, dh.subSpecification) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
136 auto channelInfoIndex = mForwardRoutes[ri].channel;
137 auto& info = mForwardChannelInfos[channelInfoIndex.value];
138 // We need to make sure that we forward the same payload only once per channel.
139 if (info.channelType == ChannelAccountingType::DPL) {
140 if (dplChannelMatched) {
141 continue;
142 }
143 dplChannelMatched = true;
144 }
145 result.emplace_back(channelInfoIndex);
146 }
147 }
148 // Remove duplicates, keeping the order of the channels.
149 std::unordered_set<int> numSet;
150 auto iter = std::stable_partition(result.begin(), result.end(),
151 [&](ChannelIndex n) { bool ret = !numSet.count(n.value); numSet.insert(n.value); return ret; }); // returns true if the item has not been "seen"
152 result.erase(iter, result.end());
153}
154
156{
157 for (int i = 0; i < mOutputChannelInfos.size(); i++) {
158 if (mOutputChannelInfos[i].name == name) {
159 return {i};
160 }
161 }
162 return {ChannelIndex::INVALID};
163}
164
166{
167 for (int i = 0; i < mInputChannelNames.size(); i++) {
168 if (mInputChannelNames[i] == name) {
169 return {i};
170 }
171 }
172 return {ChannelIndex::INVALID};
173}
174
176{
177 for (int i = 0; i < mForwardChannelInfos.size(); i++) {
178 if (mForwardChannelInfos[i].name == name) {
179 return {i};
180 }
181 }
182 return {ChannelIndex::INVALID};
183}
184
186{
187 auto transport = getOutputChannel(getOutputChannelIndex(index))->Transport();
188 assert(transport);
189 return transport;
190}
191
192fair::mq::TransportFactory* FairMQDeviceProxy::getInputTransport(RouteIndex index) const
193{
194 auto transport = getInputChannel(getInputChannelIndex(index))->Transport();
195 assert(transport);
196 return transport;
197}
198
200{
201 auto transport = getInputChannel(getInputChannelIndex(index))->Transport();
202 assert(transport);
203 return transport;
204}
205
206std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createOutputMessage(RouteIndex routeIndex) const
207{
208 return getOutputTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
209}
210
211std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createOutputMessage(RouteIndex routeIndex, const size_t size) const
212{
213 return getOutputTransport(routeIndex)->CreateMessage(size, fair::mq::Alignment{64});
214}
215
216std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createInputMessage(RouteIndex routeIndex) const
217{
218 return getInputTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
219}
220
221std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createInputMessage(RouteIndex routeIndex, const size_t size) const
222{
223 return getInputTransport(routeIndex)->CreateMessage(size, fair::mq::Alignment{64});
224}
225
226std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createForwardMessage(RouteIndex routeIndex) const
227{
228 return getForwardTransport(routeIndex)->CreateMessage(fair::mq::Alignment{64});
229}
230
231void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
232 std::vector<ForwardRoute> const& forwards,
233 fair::mq::Device& device)
234{
235 mOutputs.clear();
236 mOutputRoutes.clear();
237 mOutputChannelInfos.clear();
238 mOutputChannelStates.clear();
239 mInputs.clear();
240 mInputRoutes.clear();
241 mInputChannels.clear();
242 mInputChannelNames.clear();
243 mForwards.clear();
244 mForwardRoutes.clear();
245 mForwardChannelInfos.clear();
246 mForwardChannelStates.clear();
247 {
248 mOutputs = outputs;
249 mOutputRoutes.reserve(outputs.size());
250 size_t ri = 0;
251 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
252 for (auto& route : outputs) {
253 // If the channel is not yet registered, register it.
254 // If the channel is already registered, use the existing index.
255 auto channelPos = channelNameToChannel.find(route.channel);
256 ChannelIndex channelIndex;
257
258 if (channelPos == channelNameToChannel.end()) {
259 channelIndex = ChannelIndex{(int)mOutputChannelInfos.size()};
260 ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
261 auto channel = device.GetChannels().find(route.channel);
262 if (channel == device.GetChannels().end()) {
263 LOGP(fatal, "Expected channel {} not configured.", route.channel);
264 }
266 .name = route.channel,
267 .channelType = dplChannel,
268 .channel = channel->second.at(0),
269 .policy = route.policy,
270 .index = channelIndex,
271 };
272 mOutputChannelInfos.push_back(info);
273 mOutputChannelStates.push_back({0});
274 channelNameToChannel[route.channel] = channelIndex;
275 LOGP(detail, "Binding channel {} to channel index {}", route.channel, channelIndex.value);
276 } else {
277 LOGP(detail, "Using index {} for channel {}", channelPos->second.value, route.channel);
278 channelIndex = channelPos->second;
279 }
280 LOGP(detail, "Binding route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.value);
281 mOutputRoutes.emplace_back(RouteState{channelIndex, false});
282 ri++;
283 }
284#ifndef NDEBUG
285 for (auto& route : mOutputRoutes) {
286 assert(route.channel.value != -1);
287 assert(route.channel.value < mOutputChannelInfos.size());
288 }
289#endif
290 LOGP(detail, "Total channels found {}, total routes {}", mOutputChannelInfos.size(), mOutputRoutes.size());
291 assert(mOutputRoutes.size() == outputs.size());
292 }
293
294 {
295 auto maxLanes = InputRouteHelpers::maxLanes(inputs);
296 mInputs = inputs;
297 mInputRoutes.reserve(inputs.size());
298 size_t ri = 0;
299 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
300 for (auto& route : inputs) {
301 // If the channel is not yet registered, register it.
302 // If the channel is already registered, use the existing index.
303 auto channelPos = channelNameToChannel.find(route.sourceChannel);
304 ChannelIndex channelIndex;
305
306 if (channelPos == channelNameToChannel.end()) {
307 channelIndex = ChannelIndex{(int)mInputChannels.size()};
308 auto channel = device.GetChannels().find(route.sourceChannel);
309 if (channel == device.GetChannels().end()) {
310 LOGP(fatal, "Expected channel {} not configured.", route.sourceChannel);
311 }
312 mInputChannels.push_back(&channel->second.at(0));
313 mInputChannelNames.push_back(route.sourceChannel);
314 channelNameToChannel[route.sourceChannel] = channelIndex;
315 LOGP(detail, "Binding channel {} to channel index {}", route.sourceChannel, channelIndex.value);
316 } else {
317 LOGP(detail, "Using index {} for channel {}", channelPos->second.value, route.sourceChannel);
318 channelIndex = channelPos->second;
319 }
320 LOGP(detail, "Binding route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, maxLanes, ri, channelIndex.value);
321 mInputRoutes.emplace_back(RouteState{channelIndex, false});
322 ri++;
323 }
324 assert(std::all_of(mInputRoutes.begin(), mInputRoutes.end(), [s = mInputChannels.size()](RouteState const& route) { return route.channel.value != -1 && route.channel.value < s; }));
325 LOGP(detail, "Total input channels found {}, total routes {}", mInputChannels.size(), mInputRoutes.size());
326 assert(mInputRoutes.size() == inputs.size());
327 }
328
329 {
330 mForwards = forwards;
331 mForwardRoutes.reserve(forwards.size());
332 LOGP(detail, "Forwards.size(): {}", forwards.size());
333 size_t ri = 0;
334 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
335
336 for (auto& route : forwards) {
337 // If the channel is not yet registered, register it.
338 // If the channel is already registered, use the existing index.
339 auto channelPos = channelNameToChannel.find(route.channel);
340 ChannelIndex channelIndex;
341
342 if (channelPos == channelNameToChannel.end()) {
343 channelIndex = ChannelIndex{(int)mForwardChannelInfos.size()};
344 auto channel = device.GetChannels().find(route.channel);
345 if (channel == device.GetChannels().end()) {
346 LOGP(fatal, "Expected channel {} not configured.", route.channel);
347 }
348 ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
349 mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel->second.at(0), .policy = route.policy, .index = channelIndex});
350 mForwardChannelStates.push_back(ForwardChannelState{0});
351 channelNameToChannel[route.channel] = channelIndex;
352 LOGP(detail, "Binding forward channel {} to channel index {}", route.channel, channelIndex.value);
353 } else {
354 LOGP(detail, "Using index {} for forward channel {}", channelPos->second.value, route.channel);
355 channelIndex = channelPos->second;
356 }
357 LOGP(detail, "Binding forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.value);
358 mForwardRoutes.emplace_back(RouteState{channelIndex, false});
359 ri++;
360 }
361 LOGP(detail, "Total forward channels found {}, total routes {}", mForwardChannelInfos.size(), mForwardRoutes.size());
362 assert(mForwardRoutes.size() == forwards.size());
363 for (size_t fi = 0; fi < mForwards.size(); fi++) {
364 auto& route = mForwards[fi];
365 auto& state = mForwardRoutes[fi];
366 assert(state.channel.value != -1);
367 assert(state.channel.value < mForwardChannelInfos.size());
368 LOGP(detail, "Forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi, state.channel.value);
369 }
370 }
371 mStateChangeCallback = [&device]() -> bool { return device.NewStatePending(); };
372}
373} // namespace o2::framework
benchmark::State & state
int32_t i
std::ostringstream debug
std::unique_ptr< fair::mq::Message > createOutputMessage(RouteIndex routeIndex) const
ForwardChannelState & getForwardChannelState(ChannelIndex channelIndex)
Retrieve information associated to a given forward by ChannelIndex.
fair::mq::TransportFactory * getForwardTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
OutputChannelState & getOutputChannelState(ChannelIndex channelIndex)
Retrieve information associated to a given forward by ChannelIndex.
ChannelIndex getInputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
ChannelIndex getForwardChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
ChannelIndex getForwardChannelIndex(RouteIndex routeIndex) const
void getMatchingForwardChannelIndexes(std::vector< ChannelIndex > &result, header::DataHeader const &header, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
ForwardChannelInfo const & getForwardChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
ChannelIndex getOutputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
fair::mq::Channel * getForwardChannel(ChannelIndex channelIndex) const
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
ChannelIndex getOutputChannelIndex(OutputSpec const &spec, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
fair::mq::Channel * getOutputChannel(ChannelIndex channelIndex) const
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, fair::mq::Device &device)
std::unique_ptr< fair::mq::Message > createForwardMessage(RouteIndex routeIndex) const
fair::mq::Channel * getInputChannel(ChannelIndex channelIndex) const
Retrieve the channel associated to a given output route.
ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
fair::mq::TransportFactory * getInputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
std::unique_ptr< fair::mq::Message > createInputMessage(RouteIndex routeIndex) const
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ RAWFMQ
A raw FairMQ channel which is not accounted by the framework.
@ DPL
The channel is a normal input channel.
static constexpr int INVALID
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Forward channel information.
Definition ChannelInfo.h:88
std::string name
The name of the channel.
Definition ChannelInfo.h:90
static size_t maxLanes(std::vector< InputRoute > const &routes)
Output channel information.
Definition ChannelInfo.h:73
Keeps current state of a given route.
Definition RouteState.h:19
the main header struct
Definition DataHeader.h:618
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"