232 std::vector<ForwardRoute>
const& forwards,
233 std::function<fair::mq::Channel&(std::string
const&)> bindChannelByName,
234 std::function<
bool(
void)> newStatePending)
237 mOutputRoutes.clear();
238 mOutputChannelInfos.clear();
239 mOutputChannelStates.clear();
241 mInputRoutes.clear();
242 mInputChannels.clear();
243 mInputChannelNames.clear();
245 mForwardRoutes.clear();
246 mForwardChannelInfos.clear();
247 mForwardChannelStates.clear();
250 mOutputRoutes.reserve(outputs.size());
252 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
253 for (
auto& route : outputs) {
256 auto channelPos = channelNameToChannel.find(route.channel);
259 if (channelPos == channelNameToChannel.end()) {
262 auto& channel = bindChannelByName(route.channel);
264 .
name = route.channel,
265 .channelType = dplChannel,
267 .policy = route.policy,
268 .index = channelIndex,
270 mOutputChannelInfos.push_back(info);
271 mOutputChannelStates.push_back({0});
272 channelNameToChannel[route.channel] = channelIndex;
273 LOGP(detail,
"Binding channel {} to channel index {}", route.channel, channelIndex.
value);
275 LOGP(detail,
"Using index {} for channel {}", channelPos->second.value, route.channel);
276 channelIndex = channelPos->second;
278 LOGP(detail,
"Binding route {}@{}%{} to index {} and channelIndex {}",
DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.
value);
279 mOutputRoutes.emplace_back(
RouteState{channelIndex,
false});
283 for (
auto& route : mOutputRoutes) {
284 assert(route.channel.value != -1);
285 assert(route.channel.value < mOutputChannelInfos.size());
288 LOGP(detail,
"Total channels found {}, total routes {}", mOutputChannelInfos.size(), mOutputRoutes.size());
289 assert(mOutputRoutes.size() == outputs.size());
295 mInputRoutes.reserve(inputs.size());
297 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
298 for (
auto& route : inputs) {
301 auto channelPos = channelNameToChannel.find(route.sourceChannel);
304 if (channelPos == channelNameToChannel.end()) {
306 fair::mq::Channel& channel = bindChannelByName(route.sourceChannel);
308 mInputChannels.push_back(&channel);
309 mInputChannelNames.push_back(route.sourceChannel);
310 channelNameToChannel[route.sourceChannel] = channelIndex;
311 LOGP(detail,
"Binding channel {} to channel index {}", route.sourceChannel, channelIndex.
value);
313 LOGP(detail,
"Using index {} for channel {}", channelPos->second.value, route.sourceChannel);
314 channelIndex = channelPos->second;
316 LOGP(detail,
"Binding route {}@{}%{} to index {} and channelIndex {}",
DataSpecUtils::describe(route.matcher), route.timeslice, maxLanes, ri, channelIndex.
value);
317 mInputRoutes.emplace_back(
RouteState{channelIndex,
false});
320 assert(std::all_of(mInputRoutes.begin(), mInputRoutes.end(), [s = mInputChannels.size()](
RouteState const& route) { return route.channel.value != -1 && route.channel.value < s; }));
321 LOGP(detail,
"Total input channels found {}, total routes {}", mInputChannels.size(), mInputRoutes.size());
322 assert(mInputRoutes.size() == inputs.size());
326 mForwards = forwards;
327 mForwardRoutes.reserve(forwards.size());
328 LOGP(detail,
"Forwards.size(): {}", forwards.size());
330 std::unordered_map<std::string, ChannelIndex> channelNameToChannel;
332 for (
auto& route : forwards) {
335 auto channelPos = channelNameToChannel.find(route.channel);
338 if (channelPos == channelNameToChannel.end()) {
340 auto& channel = bindChannelByName(route.channel);
343 mForwardChannelInfos.push_back(
ForwardChannelInfo{.
name = route.channel, .channelType = dplChannel, .channel = channel, .policy = route.policy, .index = channelIndex});
345 channelNameToChannel[route.channel] = channelIndex;
346 LOGP(detail,
"Binding forward channel {} to channel index {}", route.channel, channelIndex.
value);
348 LOGP(detail,
"Using index {} for forward channel {}", channelPos->second.value, route.channel);
349 channelIndex = channelPos->second;
351 LOGP(detail,
"Binding forward route {}@{}%{} to index {} and channelIndex {}",
DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, ri, channelIndex.
value);
352 mForwardRoutes.emplace_back(
RouteState{channelIndex,
false});
355 LOGP(detail,
"Total forward channels found {}, total routes {}", mForwardChannelInfos.size(), mForwardRoutes.size());
356 assert(mForwardRoutes.size() == forwards.size());
357 for (
size_t fi = 0; fi < mForwards.size(); fi++) {
358 auto& route = mForwards[fi];
359 auto&
state = mForwardRoutes[fi];
360 assert(
state.channel.value != -1);
361 assert(
state.channel.value < mForwardChannelInfos.size());
362 LOGP(detail,
"Forward route {}@{}%{} to index {} and channelIndex {}",
DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi,
state.channel.value);
365 mStateChangeCallback = newStatePending;