Project
Loading...
Searching...
No Matches
DeviceSpecHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DeviceSpecHelpers.h"
13#include <wordexp.h>
14#include <algorithm>
15#include <boost/program_options.hpp>
16#include <cstdio>
17#include <cstdlib>
18#include <cstring>
19#include <unordered_set>
20#include <vector>
29#include "Framework/Lifetime.h"
35#include "Framework/Signpost.h"
39
40#include "WorkflowHelpers.h"
41
42#include <uv.h>
43#include <iostream>
44#include <fmt/format.h>
45
46#include <sys/time.h>
47#include <sys/resource.h>
48#include <csignal>
49#include <fairmq/Device.h>
50
51#include <regex>
52
53O2_DECLARE_DYNAMIC_LOG(device_spec_helpers);
54
55namespace bpo = boost::program_options;
56
57using namespace o2::framework;
58
59namespace o2::framework
60{
61
62namespace detail
63{
65{
66 // We simply wake up the event loop. Nothing to be done here.
67 auto* state = (DeviceState*)handle->data;
68 state->loopReason |= DeviceState::TIMER_EXPIRED;
69 state->loopReason |= DeviceState::DATA_INCOMING;
70 if (std::find(state->firedTimers.begin(), state->firedTimers.end(), handle) == state->firedTimers.end()) {
71 state->firedTimers.push_back(handle);
72 }
73}
74
76{
77 return [timer]() -> bool {
78 auto* state = (DeviceState*)timer->data;
79 return std::find(state->firedTimers.begin(), state->firedTimers.end(), timer) != state->firedTimers.end();
80 };
81}
82
84{
85 return [timer](uint64_t timeout_ms, uint64_t repeat_ms) -> void {
86 uv_timer_start(timer, detail::timer_callback, timeout_ms, repeat_ms);
87 };
88}
89
90void signal_callback(uv_signal_t* handle, int)
91{
92 // We simply wake up the event loop. Nothing to be done here.
93 auto* state = (DeviceState*)handle->data;
94 if (!state) {
95 return;
96 }
98 state->loopReason |= DeviceState::DATA_INCOMING;
99}
100} // namespace detail
101
107
109 {
110 return [matcher](DeviceState& state, ServiceRegistryRef, ConfigParamRegistry const& options) {
111 // A vector of all the available timer periods
112 std::vector<std::chrono::microseconds> periods;
113 // How long a ginven period should be active
114 std::vector<std::chrono::seconds> durations;
115 auto prefix = std::string{"period-"};
116 for (auto& meta : matcher.metadata) {
117 if (strncmp(meta.name.c_str(), prefix.c_str(), prefix.size()) == 0) {
118 // Parse the number after the prefix and consider it the duration
119 std::string_view duration(meta.name.c_str() + prefix.size(), meta.name.size() - prefix.size());
120 durations.emplace_back(std::chrono::seconds(std::stoi(std::string(duration))));
121 periods.emplace_back(std::chrono::microseconds(meta.defaultValue.get<uint64_t>() / 1000));
122 }
123 }
124 if (periods.empty()) {
125 std::string defaultRateName = std::string{"period-"} + matcher.binding;
126 auto defaultRate = std::chrono::milliseconds(options.get<int>(defaultRateName.c_str()));
127 O2_SIGNPOST_ID_GENERATE(tid, device_spec_helpers);
128 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, tid, "timeDrivenCreation", "Using default rate of %" PRIi64 " ms as specified by option period-%{public}s", defaultRate.count(),
129 matcher.binding.c_str());
130 periods.emplace_back(defaultRate.count());
131 durations.emplace_back(std::chrono::seconds((std::size_t)-1));
132 } else {
133 // If we have multiple periods, the last one gets the remaining interval
134 durations.back() = std::chrono::seconds((std::size_t)-1);
135 }
136 // We create a timer to wake us up. Notice the actual
137 // timeslot creation and record expiration still happens
138 // in a synchronous way.
139 auto* timer = (uv_timer_t*)(malloc(sizeof(uv_timer_t)));
140 timer->data = &state;
141 uv_timer_init(state.loop, timer);
142 uv_timer_start(timer, detail::timer_callback, periods.front().count(), periods.front().count());
143 state.activeTimers.push_back(timer);
144
146 };
147 }
148
155
156 static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
157 {
158 return [matcher, inputTimeslice, maxInputTimeslices](DeviceState& state, ServiceRegistryRef, ConfigParamRegistry const& options) {
159 std::string startName = std::string{"start-value-"} + matcher.binding;
160 std::string endName = std::string{"end-value-"} + matcher.binding;
161 std::string stepName = std::string{"step-value-"} + matcher.binding;
162 auto start = options.get<int64_t>(startName.c_str());
163 auto stop = options.get<int64_t>(endName.c_str());
164 auto step = options.get<int64_t>(stepName.c_str());
165 // We create a timer to wake us up. Notice the actual
166 // timeslot creation and record expiration still happens
167 // in a synchronous way.
168 auto* sh = (uv_signal_t*)(malloc(sizeof(uv_signal_t)));
169 uv_signal_init(state.loop, sh);
170 sh->data = &state;
171 uv_signal_start(sh, detail::signal_callback, SIGUSR1);
172 state.activeSignals.push_back(sh);
173
174 return LifetimeHelpers::enumDrivenCreation(start, stop, step, inputTimeslice, maxInputTimeslices, 1);
175 };
176 }
177
184
185 static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
186 {
187 return [matcher, inputTimeslice, maxInputTimeslices](DeviceState&, ServiceRegistryRef, ConfigParamRegistry const& options) {
188 std::string startName = std::string{"start-value-"} + matcher.binding;
189 std::string endName = std::string{"end-value-"} + matcher.binding;
190 std::string stepName = std::string{"step-value-"} + matcher.binding;
191 int64_t defaultStart = 0;
192 int64_t defaultStop = std::numeric_limits<int64_t>::max();
193 int64_t defaultStep = 1;
194 int defaultRepetitions = 1;
195 for (auto& meta : matcher.metadata) {
196 if (meta.name == "repetitions") {
197 defaultRepetitions = meta.defaultValue.get<int64_t>();
198 } else if (meta.name == "start-value") {
199 defaultStart = meta.defaultValue.get<int64_t>();
200 } else if (meta.name == "end-value") {
201 defaultStop = meta.defaultValue.get<int64_t>();
202 } else if (meta.name == "step-value") {
203 defaultStep = meta.defaultValue.get<int64_t>();
204 }
205 }
206 auto start = options.hasOption(startName.c_str()) ? options.get<int64_t>(startName.c_str()) : defaultStart;
207 auto stop = options.hasOption(endName.c_str()) ? options.get<int64_t>(endName.c_str()) : defaultStop;
208 auto step = options.hasOption(stepName.c_str()) ? options.get<int64_t>(stepName.c_str()) : defaultStep;
209 auto repetitions = defaultRepetitions;
210 return LifetimeHelpers::enumDrivenCreation(start, stop, step, inputTimeslice, maxInputTimeslices, repetitions);
211 };
212 }
213
218
223
225 {
226 return [](DeviceState&, ConfigParamRegistry const& options) {
227 auto serverUrl = options.get<std::string>("condition-backend");
228 return LifetimeHelpers::expectCTP(serverUrl, true);
229 };
230 }
231
232 static RouteConfigurator::ExpirationConfigurator expiringConditionConfigurator(InputSpec const& spec, std::string const& sourceChannel)
233 {
234 return [spec, sourceChannel](DeviceState&, ConfigParamRegistry const& options) {
235 auto serverUrl = options.get<std::string>("condition-backend");
236 auto forceTimestamp = options.get<std::string>("condition-timestamp");
237 return LifetimeHelpers::fetchFromCCDBCache(spec, serverUrl, forceTimestamp, sourceChannel);
238 };
239 }
240
241 static RouteConfigurator::CreationConfigurator fairmqDrivenConfiguration(InputSpec const& spec, int inputTimeslice, int maxInputTimeslices)
242 {
243 return [spec, inputTimeslice, maxInputTimeslices](DeviceState& state, ServiceRegistryRef services, ConfigParamRegistry const&) {
244 // std::string channelNameOption = std::string{"out-of-band-channel-name-"} + spec.binding;
245 // auto channelName = options.get<std::string>(channelNameOption.c_str());
246 std::string channelName = "upstream";
247 for (auto& meta : spec.metadata) {
248 if (meta.name != "channel-name") {
249 continue;
250 }
251 channelName = meta.defaultValue.get<std::string>();
252 }
253
254 auto device = services.get<RawDeviceService>().device();
255 auto& channel = device->GetChannels()[channelName];
256
257 // We assume there is always a ZeroMQ socket behind.
258 int zmq_fd = 0;
259 size_t zmq_fd_len = sizeof(zmq_fd);
260 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
261 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
262 if (zmq_fd == 0) {
263 throw runtime_error_f("Cannot get file descriptor for channel %s", channelName.c_str());
264 }
265 LOG(debug) << "Polling socket for " << channel[0].GetName();
266
267 state.activeOutOfBandPollers.push_back(poller);
268
269 // We always create entries whenever we get invoked.
270 // Notice this works only if we are the only input.
271 // Otherwise we should check the channel for new data,
272 // before we create an entry.
273 return LifetimeHelpers::enumDrivenCreation(0, -1, 1, inputTimeslice, maxInputTimeslices, 1);
274 };
275 }
276
283
285 {
286 return [spec](DeviceState&, ConfigParamRegistry const& options) {
287 std::string channelNameOption = std::string{"out-of-band-channel-name-"} + spec.binding;
288 auto channelName = options.get<std::string>(channelNameOption.c_str());
289 return LifetimeHelpers::fetchFromFairMQ(spec, channelName);
290 };
291 }
292
294 {
295 // FIXME: this should really be expireAlways. However, since we do not have
296 // a proper backend for conditions yet, I keep it behaving like it was
297 // before.
298 return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::expireNever(); };
299 }
300
305
307 {
308 return [matcher](DeviceState&, ConfigParamRegistry const&) {
310 };
311 }
312
314 {
315 return [matcher](DeviceState&, ConfigParamRegistry const&) {
317 };
318 }
319
320 static RouteConfigurator::ExpirationConfigurator expiringTimerConfigurator(InputSpec const& spec, std::string const& sourceChannel)
321 {
322 auto m = std::get_if<ConcreteDataMatcher>(&spec.matcher);
323 if (m == nullptr) {
324 throw runtime_error("InputSpec for Timers must be fully qualified");
325 }
326 // We copy the matcher to avoid lifetime issues.
327 return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
328 // Timers do not have any orbit associated to them
329 return LifetimeHelpers::enumerate(matcher, sourceChannel, 0, 0);
330 };
331 }
332
333 static RouteConfigurator::ExpirationConfigurator expiringOOBConfigurator(InputSpec const& spec, std::string const& sourceChannel)
334 {
335 auto m = std::get_if<ConcreteDataMatcher>(&spec.matcher);
336 if (m == nullptr) {
337 throw runtime_error("InputSpec for OOB must be fully qualified");
338 }
339 // We copy the matcher to avoid lifetime issues.
340 return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
341 // Timers do not have any orbit associated to them
342 return LifetimeHelpers::enumerate(matcher, sourceChannel, 0, 0);
343 };
344 }
345
346 static RouteConfigurator::ExpirationConfigurator expiringEnumerationConfigurator(InputSpec const& spec, std::string const& sourceChannel)
347 {
348 auto m = std::get_if<ConcreteDataMatcher>(&spec.matcher);
349 if (m == nullptr) {
350 throw runtime_error("InputSpec for Enumeration must be fully qualified");
351 }
352 // We copy the matcher to avoid lifetime issues.
353 return [matcher = *m, &spec, sourceChannel](DeviceState&, ConfigParamRegistry const& config) {
354 int defaultOrbitOffset = 0;
355 int defaultOrbitMultiplier = 1;
356 for (auto& meta : spec.metadata) {
357 if (meta.name == "orbit-offset") {
358 defaultOrbitOffset = meta.defaultValue.get<int64_t>();
359 } else if (meta.name == "orbit-multiplier") {
360 defaultOrbitMultiplier = meta.defaultValue.get<int64_t>();
361 }
362 }
363 size_t orbitOffset = config.hasOption("orbit-offset-enumeration") ? config.get<int64_t>("orbit-offset-enumeration") : defaultOrbitOffset;
364 size_t orbitMultiplier = config.hasOption("orbit-multiplier-enumeration") ? config.get<int64_t>("orbit-multiplier-enumeration") : defaultOrbitMultiplier;
365 return LifetimeHelpers::enumerate(matcher, sourceChannel, orbitOffset, orbitMultiplier);
366 };
367 }
368
370 {
371 // FIXME: this should really be expireAlways. However, since we do not have
372 // a proper backend for conditions yet, I keep it behaving like it was
373 // before.
374 return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::expireNever(); };
375 }
376
381
387
389 static RouteConfigurator::DanglingConfigurator danglingOptionalConfigurator(std::vector<InputRoute> const& routes)
390 {
391 return [&routes](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::expireIfPresent(routes, ConcreteDataMatcher{"FLP", "DISTSUBTIMEFRAME", 0}); };
392 }
393
395 static RouteConfigurator::ExpirationConfigurator expiringOptionalConfigurator(InputSpec const& spec, std::string const& sourceChannel)
396 {
397 try {
399 return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
400 return LifetimeHelpers::dummy(concrete, sourceChannel);
401 };
402 } catch (...) {
404 ConcreteDataMatcher concrete{dataType.origin, dataType.description, 0xdeadbeef};
405 return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
406 return LifetimeHelpers::dummy(concrete, sourceChannel);
407 };
408 // We copy the matcher to avoid lifetime issues.
409 }
410 }
411};
412
416{
417 return fmt::format("{}type={},method={},address={},rateLogging={},rcvBufSize={},sndBufSize={}",
418 channel.name.empty() ? "" : "name=" + channel.name + ",",
422 channel.rateLogging,
423 channel.recvBufferSize,
424 channel.sendBufferSize);
425}
426
428{
429 return fmt::format("{}type={},method={},address={},rateLogging={},rcvBufSize={},sndBufSize={}",
430 channel.name.empty() ? "" : "name=" + channel.name + ",",
434 channel.rateLogging,
435 channel.recvBufferSize,
436 channel.sendBufferSize);
437}
438
439void DeviceSpecHelpers::validate(std::vector<DataProcessorSpec> const& workflow)
440{
441 // Iterate on all the DataProcessorSpecs in the altered_workflow
442 // and check for duplicates outputs among those who have lifetime == Timeframe
443 // Do so by:
444 //
445 // * Get the list of all Lifetime::Timeframe outputs for the workflow.
446 // Only those who are concrete matchers are considered for now, because
447 // it becomes to complicate to check for the wildcard case.
448 // * Sort the associated matchers by origin, description, subSpec
449 // * Check that the next element is not the same
450 std::vector<std::pair<int, std::string>> timeframeOutputs;
451 for (size_t i = 0; i < workflow.size(); ++i) {
452 auto& spec = workflow[i];
453 // We do not want to check for pipelining
454 if (spec.inputTimeSliceId != 0) {
455 continue;
456 }
457 for (auto& output : spec.outputs) {
458 if (output.lifetime != Lifetime::Timeframe) {
459 continue;
460 }
461 std::optional<ConcreteDataMatcher> matcher = DataSpecUtils::asOptionalConcreteDataMatcher(output);
462 if (!matcher) {
463 continue;
464 }
465 timeframeOutputs.emplace_back(i, DataSpecUtils::describe(*matcher));
466 }
467 }
468 std::stable_sort(timeframeOutputs.begin(), timeframeOutputs.end(), [](auto const& a, auto const& b) {
469 return a.second < b.second;
470 });
471
472 auto it = std::adjacent_find(timeframeOutputs.begin(), timeframeOutputs.end(), [](auto const& a, auto const& b) {
473 return a.second == b.second;
474 });
475 if (it != timeframeOutputs.end()) {
476 // Tell which are the two duplicates
477 auto device1 = workflow[it->first].name;
478 auto device2 = workflow[(it + 1)->first].name;
479 auto output1 = it->second;
480 auto output2 = (it + 1)->second;
481 throw std::runtime_error(fmt::format("Found duplicate outputs {} in device {} ({}) and {} in {} ({})",
482 output1, device1, it->first, output2, device2, (it + 1)->first));
483 }
484}
485
487 std::vector<DeviceSpec>& devices,
488 std::vector<DeviceId>& deviceIndex,
489 std::vector<DeviceConnectionId>& connections,
490 ResourceManager& resourceManager,
491 const std::vector<size_t>& outEdgeIndex,
492 const std::vector<DeviceConnectionEdge>& logicalEdges,
493 const std::vector<EdgeAction>& actions, const WorkflowSpec& workflow,
494 const std::vector<OutputSpec>& outputsMatchers,
495 const std::vector<ChannelConfigurationPolicy>& channelPolicies,
496 const std::vector<SendingPolicy>& sendingPolicies,
497 const std::vector<ForwardingPolicy>& forwardingPolicies,
498 std::string const& channelPrefix,
499 ComputingOffer const& defaultOffer,
500 OverrideServiceSpecs const& overrideServices)
501{
502 // The topology cannot be empty or not connected. If that is the case, than
503 // something before this went wrong.
504 // FIXME: is that really true???
505 assert(!workflow.empty());
506
507 // Edges are navigated in order for each device, so the device associaited to
508 // an edge is always the last one created.
509 auto deviceForEdge = [&actions, &workflow, &devices,
510 &logicalEdges, &resourceManager,
511 &defaultOffer, &channelPrefix, overrideServices](size_t ei, ComputingOffer& acceptedOffer) {
512 auto& edge = logicalEdges[ei];
513 auto& action = actions[ei];
514
515 if (action.requiresNewDevice == false) {
516 assert(devices.empty() == false);
517 return devices.size() - 1;
518 }
519 if (acceptedOffer.hostname != "") {
520 resourceManager.notifyAcceptedOffer(acceptedOffer);
521 }
522
523 auto& processor = workflow[edge.producer];
524
525 acceptedOffer.cpu = defaultOffer.cpu;
526 acceptedOffer.memory = defaultOffer.memory;
527 for (auto offer : resourceManager.getAvailableOffers()) {
528 if (offer.cpu < acceptedOffer.cpu) {
529 continue;
530 }
531 if (offer.memory < acceptedOffer.memory) {
532 continue;
533 }
534 acceptedOffer.hostname = offer.hostname;
535 acceptedOffer.startPort = offer.startPort;
536 acceptedOffer.rangeSize = 0;
537 break;
538 }
539
540 devices.emplace_back(DeviceSpec{
541 .name = processor.name,
542 .id = processor.maxInputTimeslices == 1 ? processor.name : processor.name + "_t" + std::to_string(edge.producerTimeIndex),
543 .channelPrefix = channelPrefix,
544 .inputChannels = {},
545 .options = processor.options,
546 .services = ServiceSpecHelpers::filterDisabled(processor.requiredServices, overrideServices),
547 .algorithm = processor.algorithm,
548 .rank = processor.rank,
549 .nSlots = processor.nSlots,
550 .inputTimesliceId = edge.producerTimeIndex,
551 .maxInputTimeslices = processor.maxInputTimeslices,
552 .resource = {acceptedOffer},
553 .labels = processor.labels,
554 .metadata = processor.metadata});
557 //
558 // for (auto& input : processor.inputs) {
559 // if (input.lifetime != Lifetime::OutOfBand) {
560 // continue;
561 // }
562 // InputChannelSpec extraInputChannelSpec{
563 // .name = "upstream",
564 // .type = ChannelType::Pair,
565 // .method = ChannelMethod::Bind,
566 // .hostname = "localhost",
567 // .port = 33000,
568 // .protocol = ChannelProtocol::IPC,
569 // };
570 // for (auto& meta : input.metadata) {
571 // if (meta.name == "name") {
572 // extraInputChannelSpec.name = meta.defaultValue.get<std::string>();
573 // }
574 // if (meta.name == "port") {
575 // extraInputChannelSpec.port = meta.defaultValue.get<int32_t>();
576 // }
577 // if (meta.name == "address") {
578 // extraInputChannelSpec.hostname = meta.defaultValue.get<std::string>();
579 // }
580 // }
581 // device.inputChannels.push_back(extraInputChannelSpec);
582 //}
583 for (auto& output : processor.outputs) {
584 if (output.lifetime != Lifetime::OutOfBand) {
585 continue;
586 }
587 OutputChannelSpec extraOutputChannelSpec{
588 .name = "downstream",
589 .type = ChannelType::Pair,
590 .method = ChannelMethod::Connect,
591 .hostname = "localhost",
592 .port = 33000,
593 .protocol = ChannelProtocol::IPC};
594 for (auto& meta : output.metadata) {
595 if (meta.name == "channel-name") {
596 extraOutputChannelSpec.name = meta.defaultValue.get<std::string>();
597 }
598 if (meta.name == "port") {
599 extraOutputChannelSpec.port = meta.defaultValue.get<int32_t>();
600 }
601 if (meta.name == "address") {
602 extraOutputChannelSpec.hostname = meta.defaultValue.get<std::string>();
603 }
604 }
605 devices.back().outputChannels.push_back(extraOutputChannelSpec);
606 }
607 return devices.size() - 1;
608 };
609
610 auto channelFromDeviceEdgeAndPort = [&connections, &workflow, &channelPolicies](const DeviceSpec& device,
611 ComputingResource& deviceResource,
612 ComputingOffer& acceptedOffer,
613 const DeviceConnectionEdge& edge) {
614 OutputChannelSpec channel;
615 auto& consumer = workflow[edge.consumer];
616 std::string consumerDeviceId = consumer.name;
617 if (consumer.maxInputTimeslices != 1) {
618 consumerDeviceId += "_t" + std::to_string(edge.timeIndex);
619 }
620 channel.name = device.channelPrefix + "from_" + device.id + "_to_" + consumerDeviceId;
621 channel.port = acceptedOffer.startPort + acceptedOffer.rangeSize;
622 channel.hostname = acceptedOffer.hostname;
623 deviceResource.usedPorts += 1;
624 acceptedOffer.rangeSize += 1;
625
626 for (auto& policy : channelPolicies) {
627 if (policy.match(device.id, consumerDeviceId)) {
628 policy.modifyOutput(channel);
629 break;
630 }
631 }
632 DeviceConnectionId id{edge.producer, edge.consumer, edge.timeIndex, edge.producerTimeIndex, channel.port};
633 connections.push_back(id);
634
635 auto& source = workflow[edge.producer];
636
637 O2_SIGNPOST_ID_GENERATE(sid, device_spec_helpers);
638 O2_SIGNPOST_START(device_spec_helpers, sid, "new channels", "Channel %{public}s has been created.", channel.name.c_str());
639 O2_SIGNPOST_ID_GENERATE(iid, device_spec_helpers);
640 O2_SIGNPOST_START(device_spec_helpers, iid, "producer outputs", "Producer %{public}s has the following outputs:", source.name.c_str());
641 for (auto& output : source.outputs) {
642 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, iid, "producer outputs", "%{public}s", DataSpecUtils::describe(output).c_str());
643 }
644 O2_SIGNPOST_END(device_spec_helpers, iid, "producer outputs", "");
645 O2_SIGNPOST_START(device_spec_helpers, iid, "producer forwards", "Producer %{public}s has the following forwards:", source.name.c_str());
646 for (auto& forwards : device.forwards) {
647 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, iid, "producer forwards", "%{public}s", DataSpecUtils::describe(forwards.matcher).c_str());
648 }
649 O2_SIGNPOST_END(device_spec_helpers, iid, "producer forwards", "");
650 O2_SIGNPOST_START(device_spec_helpers, iid, "consumer inputs", "Consumer %{public}s has the following inputs:", consumer.name.c_str());
651 for (auto& input : consumer.inputs) {
652 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, iid, "consumer inputs", "%{public}s", DataSpecUtils::describe(input).c_str());
653 }
654 O2_SIGNPOST_END(device_spec_helpers, iid, "consumer inputs", "");
655 O2_SIGNPOST_END(device_spec_helpers, sid, "new channels", "");
656 return channel;
657 };
658
659 auto isDifferentDestinationDeviceReferredBy = [&actions](size_t ei) { return actions[ei].requiresNewChannel; };
660
661 // This creates a new channel for a given edge, if needed. Notice that we
662 // navigate edges in a per device fashion (creating those if they are not
663 // alredy there) and create a new channel only if it connects two new
664 // devices. Whether or not this is the case was previously computed
665 // in the action.requiresNewChannel field.
666 auto createChannelForDeviceEdge = [&devices, &logicalEdges, &channelFromDeviceEdgeAndPort,
667 &deviceIndex](size_t di, size_t ei, ComputingOffer& offer) {
668 auto& device = devices[di];
669 auto& edge = logicalEdges[ei];
670
671 deviceIndex.emplace_back(DeviceId{edge.producer, edge.producerTimeIndex, di});
672
673 OutputChannelSpec channel = channelFromDeviceEdgeAndPort(device, device.resource, offer, edge);
674
675 device.outputChannels.push_back(channel);
676 return device.outputChannels.size() - 1;
677 };
678
679 // Notice how we need to behave in two different ways depending
680 // whether this is a real OutputRoute or if it's a forward from
681 // a previous consumer device.
682 // FIXME: where do I find the InputSpec for the forward?
683 auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges, &sendingPolicies, &forwardingPolicies, &configContext](
684 size_t ei, size_t di, size_t ci) {
685 assert(ei < logicalEdges.size());
686 assert(di < devices.size());
687 assert(ci < devices[di].outputChannels.size());
688 auto& edge = logicalEdges[ei];
689 auto& device = devices[di];
690 assert(edge.consumer < workflow.size());
691 auto& consumer = workflow[edge.consumer];
692 auto& producer = workflow[edge.producer];
693 auto& channel = devices[di].outputChannels[ci];
694 assert(edge.outputGlobalIndex < outputsMatchers.size());
695 // Iterate over all the policies and apply the first one that matches.
696 SendingPolicy const* policyPtr = nullptr;
697 ForwardingPolicy const* forwardPolicyPtr = nullptr;
698 for (auto& policy : sendingPolicies) {
699 if (policy.matcher(producer, consumer, configContext)) {
700 policyPtr = &policy;
701 break;
702 }
703 }
704 assert(forwardingPolicies.empty() == false);
705 for (auto& policy : forwardingPolicies) {
706 if (policy.matcher(producer, consumer, configContext)) {
707 forwardPolicyPtr = &policy;
708 break;
709 }
710 }
711 assert(policyPtr != nullptr);
712 assert(forwardPolicyPtr != nullptr);
713
714 if (edge.isForward == false) {
715 OutputRoute route{
716 .timeslice = edge.timeIndex,
717 .maxTimeslices = consumer.maxInputTimeslices,
718 .matcher = outputsMatchers[edge.outputGlobalIndex],
719 .channel = channel.name,
720 .policy = policyPtr,
721 };
722 device.outputs.emplace_back(route);
723 } else {
724 ForwardRoute route{
725 .timeslice = edge.timeIndex,
726 .maxTimeslices = consumer.maxInputTimeslices,
727 .matcher = workflow[edge.consumer].inputs[edge.consumerInputIndex],
728 .channel = channel.name,
729 .policy = forwardPolicyPtr,
730 };
731 // In case we have a timer, the data it creates should be
732 // forwarded as a timeframe to the next device, so that
733 // we have synchronization.
734 if (route.matcher.lifetime == Lifetime::Timer) {
735 route.matcher.lifetime = Lifetime::Timeframe;
736 }
737 device.forwards.emplace_back(route);
738 }
739 };
740
741 auto sortDeviceIndex = [&deviceIndex]() { std::sort(deviceIndex.begin(), deviceIndex.end()); };
742
743 auto lastChannelFor = [&devices](size_t di) {
744 assert(di < devices.size());
745 assert(devices[di].outputChannels.empty() == false);
746 return devices[di].outputChannels.size() - 1;
747 };
748
749 //
750 // OUTER LOOP
751 //
752 // We need to create all the channels going out of a device, and associate
753 // routes to them for this reason
754 // we iterate over all the edges (which are per-datatype exchanged) and
755 // whenever we need to connect to a new device we create the channel. `device`
756 // here refers to the source device. This loop will therefore not create the
757 // devices which acts as sink, which are done in the preocessInEdgeActions
758 // function.
759 ComputingOffer acceptedOffer;
760 for (auto edge : outEdgeIndex) {
761 auto device = deviceForEdge(edge, acceptedOffer);
762 size_t channel = -1;
763 if (isDifferentDestinationDeviceReferredBy(edge)) {
764 channel = createChannelForDeviceEdge(device, edge, acceptedOffer);
765 } else {
766 channel = lastChannelFor(device);
767 }
768 appendOutputRouteToSourceDeviceChannel(edge, device, channel);
769 }
770 if (std::string(acceptedOffer.hostname) != "") {
771 resourceManager.notifyAcceptedOffer(acceptedOffer);
772 }
773 sortDeviceIndex();
774}
775
776void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
777 std::vector<DeviceId>& deviceIndex,
778 const std::vector<DeviceConnectionId>& connections,
779 ResourceManager& resourceManager,
780 const std::vector<size_t>& inEdgeIndex,
781 const std::vector<DeviceConnectionEdge>& logicalEdges,
782 const std::vector<EdgeAction>& actions, const WorkflowSpec& workflow,
783 std::vector<LogicalForwardInfo> const& availableForwardsInfo,
784 std::vector<ChannelConfigurationPolicy> const& channelPolicies,
785 std::string const& channelPrefix,
786 ComputingOffer const& defaultOffer,
787 OverrideServiceSpecs const& overrideServices)
788{
789 auto const& constDeviceIndex = deviceIndex;
790 if (!std::is_sorted(constDeviceIndex.cbegin(), constDeviceIndex.cend())) {
791 throw o2::framework::runtime_error("Needs a sorted vector to be correct");
792 }
793
794 auto findProducerForEdge = [&logicalEdges, &constDeviceIndex](size_t ei) {
795 auto& edge = logicalEdges[ei];
796
797 DeviceId pid{edge.producer, edge.producerTimeIndex, 0};
798 auto deviceIt = std::lower_bound(constDeviceIndex.cbegin(), constDeviceIndex.cend(), pid);
799 // By construction producer should always be there
800 assert(deviceIt != constDeviceIndex.end());
801 assert(deviceIt->processorIndex == pid.processorIndex && deviceIt->timeslice == pid.timeslice);
802 return deviceIt->deviceIndex;
803 };
804
805 auto findConsumerForEdge = [&logicalEdges, &constDeviceIndex](size_t ei) {
806 auto& edge = logicalEdges[ei];
807
808 DeviceId pid{edge.consumer, edge.timeIndex, 0};
809 auto deviceIt = std::lower_bound(constDeviceIndex.cbegin(), constDeviceIndex.cend(), pid);
810 // We search for a consumer only if we know it's is already there.
811 assert(deviceIt != constDeviceIndex.end());
812 assert(deviceIt->processorIndex == pid.processorIndex && deviceIt->timeslice == pid.timeslice);
813 return deviceIt->deviceIndex;
814 };
815
816 // Notice that to start with, consumer exists only if they also are
817 // producers, so we need to create one if it does not exist. Given this is
818 // stateful, we keep an eye on what edge was last searched to make sure we
819 // are not screwing up.
820 //
821 // Notice this is not thread safe.
822 decltype(deviceIndex.begin()) lastConsumerSearch;
823 size_t lastConsumerSearchEdge;
824 auto hasConsumerForEdge = [&lastConsumerSearch, &lastConsumerSearchEdge, &deviceIndex,
825 &logicalEdges](size_t ei) -> int {
826 auto& edge = logicalEdges[ei];
827 DeviceId cid{edge.consumer, edge.timeIndex, 0};
828 lastConsumerSearchEdge = ei; // This will invalidate the cache
829 lastConsumerSearch = std::lower_bound(deviceIndex.begin(), deviceIndex.end(), cid);
830 return lastConsumerSearch != deviceIndex.end() && cid.processorIndex == lastConsumerSearch->processorIndex &&
831 cid.timeslice == lastConsumerSearch->timeslice;
832 };
833
834 // The passed argument is there just to check. We do know that the last searched
835 // is the one we want.
836 auto getConsumerForEdge = [&lastConsumerSearch, &lastConsumerSearchEdge](size_t ei) {
837 assert(ei == lastConsumerSearchEdge);
838 return lastConsumerSearch->deviceIndex;
839 };
840
841 auto createNewDeviceForEdge = [&workflow, &logicalEdges, &devices,
842 &deviceIndex, &resourceManager, &defaultOffer,
843 &channelPrefix, &overrideServices](size_t ei, ComputingOffer& acceptedOffer) {
844 auto& edge = logicalEdges[ei];
845
846 if (acceptedOffer.hostname != "") {
847 resourceManager.notifyAcceptedOffer(acceptedOffer);
848 }
849
850 auto& processor = workflow[edge.consumer];
851
852 acceptedOffer.cpu = defaultOffer.cpu;
853 acceptedOffer.memory = defaultOffer.memory;
854 for (auto offer : resourceManager.getAvailableOffers()) {
855 if (offer.cpu < acceptedOffer.cpu) {
856 continue;
857 }
858 if (offer.memory < acceptedOffer.memory) {
859 continue;
860 }
861 acceptedOffer.hostname = offer.hostname;
862 acceptedOffer.startPort = offer.startPort;
863 acceptedOffer.rangeSize = 0;
864 break;
865 }
866
867 DeviceSpec device{
868 .name = processor.name,
869 .id = processor.name,
870 .channelPrefix = channelPrefix,
871 .options = processor.options,
872 .services = ServiceSpecHelpers::filterDisabled(processor.requiredServices, overrideServices),
873 .algorithm = processor.algorithm,
874 .rank = processor.rank,
875 .nSlots = processor.nSlots,
876 .inputTimesliceId = edge.timeIndex,
877 .maxInputTimeslices = processor.maxInputTimeslices,
878 .resource = {acceptedOffer},
879 .labels = processor.labels,
880 .metadata = processor.metadata};
881
882 if (processor.maxInputTimeslices != 1) {
883 device.id += "_t" + std::to_string(edge.timeIndex);
884 }
885
886 // FIXME: maybe I should use an std::map in the end
887 // but this is really not performance critical
888 auto id = DeviceId{edge.consumer, edge.timeIndex, devices.size()};
889 devices.emplace_back(std::move(device));
890 deviceIndex.push_back(id);
891 std::sort(deviceIndex.begin(), deviceIndex.end());
892 return devices.size() - 1;
893 };
894
895 // We search for a preexisting outgoing connection associated to this edge.
896 // This is to retrieve the port of the source.
897 // This has to exists, because we already created all the outgoing connections
898 // so it's just a matter of looking it up.
899 auto findMatchingOutgoingPortForEdge = [&logicalEdges, &connections](size_t ei) {
900 auto const& edge = logicalEdges[ei];
901 DeviceConnectionId connectionId{edge.producer, edge.consumer, edge.timeIndex, edge.producerTimeIndex, 0};
902
903 auto it = std::lower_bound(connections.begin(), connections.end(), connectionId);
904
905 assert(it != connections.end());
906 assert(it->producer == connectionId.producer);
907 assert(it->consumer == connectionId.consumer);
908 assert(it->timeIndex == connectionId.timeIndex);
909 assert(it->producerTimeIndex == connectionId.producerTimeIndex);
910 return it->port;
911 };
912
913 auto checkNoDuplicatesFor = [](std::vector<InputChannelSpec> const& channels, const std::string& name) {
914 for (auto const& channel : channels) {
915 if (channel.name == name) {
916 return false;
917 }
918 }
919 return true;
920 };
921 auto appendInputChannelForConsumerDevice = [&devices, &checkNoDuplicatesFor, &channelPolicies](
922 size_t pi, size_t ci, unsigned short port) {
923 auto const& producerDevice = devices[pi];
924 auto& consumerDevice = devices[ci];
925 InputChannelSpec channel;
926 channel.name = producerDevice.channelPrefix + "from_" + producerDevice.id + "_to_" + consumerDevice.id;
927 channel.hostname = producerDevice.resource.hostname;
928 channel.port = port;
929 for (auto& policy : channelPolicies) {
930 if (policy.match(producerDevice.id, consumerDevice.id)) {
931 policy.modifyInput(channel);
932 break;
933 }
934 }
935 assert(checkNoDuplicatesFor(consumerDevice.inputChannels, channel.name));
936 consumerDevice.inputChannels.push_back(channel);
937 return consumerDevice.inputChannels.size() - 1;
938 };
939
940 // I think this is trivial, since I think it should always be the last one,
941 // in case it's not actually the case, I should probably do an actual lookup
942 // here.
943 auto getChannelForEdge = [&devices](size_t pi, size_t ci) {
944 auto& consumerDevice = devices[ci];
945 return consumerDevice.inputChannels.size() - 1;
946 };
947
948 // This is always called when adding a new channel, so we can simply refer
949 // to back. Notice also that this is the place where it makes sense to
950 // assign the forwarding, given that the forwarded stuff comes from some
951 // input.
952 auto appendInputRouteToDestDeviceChannel = [&devices, &logicalEdges, &workflow](size_t ei, size_t di, size_t ci) {
953 auto const& edge = logicalEdges[ei];
954 auto const& consumer = workflow[edge.consumer];
955 auto const& producer = workflow[edge.producer];
956 auto& consumerDevice = devices[di];
957
958 auto const& inputSpec = consumer.inputs[edge.consumerInputIndex];
959 auto const& sourceChannel = consumerDevice.inputChannels[ci].name;
960
961 InputRoute route{
962 inputSpec,
963 edge.consumerInputIndex,
964 sourceChannel,
965 edge.producerTimeIndex,
966 std::nullopt};
967
968 // In case we have wildcards, we must make sure that some other edge
969 // produced the same route, i.e. has the same matcher. Without this,
970 // otherwise, we would end up with as many input routes as the outputs that
971 // can be matched by the wildcard.
972 for (size_t iri = 0; iri < consumerDevice.inputs.size(); ++iri) {
973 auto& existingRoute = consumerDevice.inputs[iri];
974 if (existingRoute.timeslice != edge.producerTimeIndex) {
975 continue;
976 }
977 if (existingRoute.inputSpecIndex == edge.consumerInputIndex) {
978 return;
979 }
980 }
981
982 // In case we add a new route to the device, we remap any
983 // Lifetime::Timer to Lifetime::Timeframe, so that we can
984 // synchronize the devices without creating a new timer.
985 if (edge.isForward && route.matcher.lifetime == Lifetime::Timer) {
986 LOGP(warn,
987 "Warning: Forwarding timer {} from {} to a {} as both requested it."
988 " If this is undesired, please make sure to use two different data matchers for their InputSpec.",
989 DataSpecUtils::describe(route.matcher).c_str(),
990 producer.name.c_str(),
991 consumer.name.c_str());
992 route.matcher.lifetime = Lifetime::Timeframe;
993 }
994
995 consumerDevice.inputs.push_back(route);
996 };
997
998 // Outer loop. A new device is needed for each
999 // of the sink data processors.
1000 // New InputChannels need to refer to preexisting OutputChannels we create
1001 // previously.
1002 ComputingOffer acceptedOffer;
1003 for (size_t edge : inEdgeIndex) {
1004 auto& action = actions[edge];
1005
1006 size_t consumerDevice = -1;
1007
1008 if (action.requiresNewDevice) {
1009 if (hasConsumerForEdge(edge)) {
1010 consumerDevice = getConsumerForEdge(edge);
1011 } else {
1012 consumerDevice = createNewDeviceForEdge(edge, acceptedOffer);
1013 }
1014 } else {
1015 consumerDevice = findConsumerForEdge(edge);
1016 }
1017 size_t producerDevice = findProducerForEdge(edge);
1018
1019 size_t channel = -1;
1020 if (action.requiresNewChannel) {
1021 int16_t port = findMatchingOutgoingPortForEdge(edge);
1022 channel = appendInputChannelForConsumerDevice(producerDevice, consumerDevice, port);
1023 } else {
1024 channel = getChannelForEdge(producerDevice, consumerDevice);
1025 }
1026 appendInputRouteToDestDeviceChannel(edge, consumerDevice, channel);
1027 }
1028
1029 // Bind the expiration mechanism to the input routes
1030 for (auto& device : devices) {
1031 for (auto& route : device.inputs) {
1032 switch (route.matcher.lifetime) {
1033 case Lifetime::OutOfBand:
1034 route.configurator = {
1035 .name = "oob",
1036 .creatorConfigurator = ExpirationHandlerHelpers::loopEventDrivenConfigurator(route.matcher),
1038 .expirationConfigurator = ExpirationHandlerHelpers::expiringOOBConfigurator(route.matcher, route.sourceChannel)};
1039 break;
1040 // case Lifetime::Condition:
1041 // route.configurator = {
1042 // ExpirationHandlerHelpers::dataDrivenConfigurator(),
1043 // ExpirationHandlerHelpers::danglingConditionConfigurator(),
1044 // ExpirationHandlerHelpers::expiringConditionConfigurator(inputSpec, sourceChannel)};
1045 // break;
1046 case Lifetime::QA:
1047 route.configurator = {
1048 .name = "qa",
1049 .creatorConfigurator = ExpirationHandlerHelpers::dataDrivenConfigurator(),
1050 .danglingConfigurator = ExpirationHandlerHelpers::danglingQAConfigurator(),
1051 .expirationConfigurator = ExpirationHandlerHelpers::expiringQAConfigurator()};
1052 break;
1053 case Lifetime::Timer:
1054 route.configurator = {
1055 .name = "timer",
1056 .creatorConfigurator = ExpirationHandlerHelpers::timeDrivenConfigurator(route.matcher),
1057 .danglingConfigurator = ExpirationHandlerHelpers::danglingTimerConfigurator(route.matcher),
1058 .expirationConfigurator = ExpirationHandlerHelpers::expiringTimerConfigurator(route.matcher, route.sourceChannel)};
1059 break;
1060 case Lifetime::Enumeration:
1061 route.configurator = {
1062 .name = "enumeration",
1063 .creatorConfigurator = ExpirationHandlerHelpers::enumDrivenConfigurator(route.matcher, device.inputTimesliceId, device.maxInputTimeslices),
1064 .danglingConfigurator = ExpirationHandlerHelpers::danglingEnumerationConfigurator(route.matcher),
1065 .expirationConfigurator = ExpirationHandlerHelpers::expiringEnumerationConfigurator(route.matcher, route.sourceChannel)};
1066 break;
1067 case Lifetime::Signal:
1068 route.configurator = {
1069 .name = "signal",
1070 .creatorConfigurator = ExpirationHandlerHelpers::signalDrivenConfigurator(route.matcher, device.inputTimesliceId, device.maxInputTimeslices),
1071 .danglingConfigurator = ExpirationHandlerHelpers::danglingEnumerationConfigurator(route.matcher),
1072 .expirationConfigurator = ExpirationHandlerHelpers::expiringEnumerationConfigurator(route.matcher, route.sourceChannel)};
1073 break;
1074 case Lifetime::Transient:
1075 route.configurator = {
1076 .name = "transient",
1077 .creatorConfigurator = ExpirationHandlerHelpers::dataDrivenConfigurator(),
1079 .expirationConfigurator = ExpirationHandlerHelpers::expiringTransientConfigurator(route.matcher)};
1080 break;
1081 case Lifetime::Optional:
1082 route.configurator = {
1083 .name = "optional",
1085 .danglingConfigurator = ExpirationHandlerHelpers::danglingOptionalConfigurator(device.inputs),
1086 .expirationConfigurator = ExpirationHandlerHelpers::expiringOptionalConfigurator(route.matcher, route.sourceChannel)};
1087 break;
1088 default:
1089 break;
1090 }
1091 }
1092 }
1093
1094 if (acceptedOffer.hostname != "") {
1095 resourceManager.notifyAcceptedOffer(acceptedOffer);
1096 }
1097}
1098
1099// Construct the list of actual devices we want, given a workflow.
1100//
1101// FIXME: make start port configurable?
1103 std::vector<ChannelConfigurationPolicy> const& channelPolicies,
1104 std::vector<CompletionPolicy> const& completionPolicies,
1105 std::vector<DispatchPolicy> const& dispatchPolicies,
1106 std::vector<ResourcePolicy> const& resourcePolicies,
1107 std::vector<CallbacksPolicy> const& callbacksPolicies,
1108 std::vector<SendingPolicy> const& sendingPolicies,
1109 std::vector<ForwardingPolicy> const& forwardingPolicies,
1110 std::vector<DeviceSpec>& devices,
1111 ResourceManager& resourceManager,
1112 std::string const& uniqueWorkflowId,
1113 ConfigContext const& configContext,
1114 bool optimizeTopology,
1115 unsigned short resourcesMonitoringInterval,
1116 std::string const& channelPrefix,
1117 OverrideServiceSpecs const& overrideServices)
1118{
1119 // Always check for validity of the workflow before instanciating it
1121 // In case the workflow is empty, we simply do not need to instanciate any device.
1122 if (workflow.empty()) {
1123 return;
1124 }
1125 std::vector<LogicalForwardInfo> availableForwardsInfo;
1126 std::vector<DeviceConnectionEdge> logicalEdges;
1127 std::vector<DeviceConnectionId> connections;
1128 std::vector<DeviceId> deviceIndex;
1129
1130 // This is a temporary store for inputs and outputs,
1131 // including forwarded channels, so that we can construct
1132 // them before assigning to a device.
1133 std::vector<OutputSpec> outputs;
1134
1135 WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, availableForwardsInfo);
1136
1137 // We need to instanciate one device per (me, timeIndex) in the
1138 // DeviceConnectionEdge. For each device we need one new binding
1139 // server per (me, other) -> port Moreover for each (me, other,
1140 // outputGlobalIndex) we need to insert either an output or a
1141 // forward.
1142 //
1143 // We then sort by other. For each (other, me) we need to connect to
1144 // port (me, other) and add an input.
1145
1146 // Fill an index to do the sorting
1147 std::vector<size_t> inEdgeIndex;
1148 std::vector<size_t> outEdgeIndex;
1149 WorkflowHelpers::sortEdges(inEdgeIndex, outEdgeIndex, logicalEdges);
1150
1151 std::vector<EdgeAction> outActions = WorkflowHelpers::computeOutEdgeActions(logicalEdges, outEdgeIndex);
1152 // Crete the connections on the inverse map for all of them
1153 // lookup for port and add as input of the current device.
1154 std::vector<EdgeAction> inActions = WorkflowHelpers::computeInEdgeActions(logicalEdges, inEdgeIndex);
1155 size_t deviceCount = 0;
1156 for (auto& action : outActions) {
1157 deviceCount += action.requiresNewDevice ? 1 : 0;
1158 }
1159 for (auto& action : inActions) {
1160 deviceCount += action.requiresNewDevice ? 1 : 0;
1161 }
1162
1163 ComputingOffer defaultOffer;
1164 for (auto& offer : resourceManager.getAvailableOffers()) {
1165 defaultOffer.cpu += offer.cpu;
1166 defaultOffer.memory += offer.memory;
1167 }
1168
1170 defaultOffer.cpu /= deviceCount + 1;
1171 defaultOffer.memory /= deviceCount + 1;
1172
1173 processOutEdgeActions(configContext, devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1174 outActions, workflow, outputs, channelPolicies, sendingPolicies, forwardingPolicies, channelPrefix, defaultOffer, overrideServices);
1175
1176 // FIXME: is this not the case???
1177 std::sort(connections.begin(), connections.end());
1178
1179 processInEdgeActions(devices, deviceIndex, connections, resourceManager, inEdgeIndex, logicalEdges,
1180 inActions, workflow, availableForwardsInfo, channelPolicies, channelPrefix, defaultOffer, overrideServices);
1181 // We apply the completion policies here since this is where we have all the
1182 // devices resolved.
1183 std::map<std::string, DataProcessorPoliciesInfo> policies;
1184 for (DeviceSpec& device : devices) {
1185 bool hasPolicy = false;
1186 policies[device.name].completionPolicyName = "unknown";
1187 for (auto& policy : completionPolicies) {
1188 if (policy.matcher(device) == true) {
1189 policies[policy.name].completionPolicyName = policy.name;
1190 device.completionPolicy = policy;
1191 hasPolicy = true;
1192 break;
1193 }
1194 }
1195 if (hasPolicy == false) {
1196 throw runtime_error_f("Unable to find a completion policy for %s", device.id.c_str());
1197 }
1198 for (auto& policy : dispatchPolicies) {
1199 if (policy.deviceMatcher(device) == true) {
1200 device.dispatchPolicy = policy;
1201 break;
1202 }
1203 }
1204 for (auto& policy : callbacksPolicies) {
1205 if (policy.matcher(device, configContext) == true) {
1206 device.callbacksPolicy = policy;
1207 break;
1208 }
1209 }
1210 hasPolicy = false;
1211 for (auto& policy : resourcePolicies) {
1212 if (policy.matcher(device) == true) {
1213 device.resourcePolicy = policy;
1214 hasPolicy = true;
1215 break;
1216 }
1217 }
1218 if (hasPolicy == false) {
1219 throw runtime_error_f("Unable to find a resource policy for %s", device.id.c_str());
1220 }
1221 }
1222 // Iterate of the workflow and create a consistent vector of DataProcessorPoliciesInfo
1223 std::vector<DataProcessorPoliciesInfo> policiesVector;
1224 for (size_t wi = 0; wi < workflow.size(); ++wi) {
1225 auto& processor = workflow[wi];
1226 auto& info = policies[processor.name];
1227 policiesVector.push_back(info);
1228 }
1229
1230 WorkflowHelpers::validateEdges(workflow, policiesVector, logicalEdges, outputs);
1231
1232 for (auto& device : devices) {
1233 device.resourceMonitoringInterval = resourcesMonitoringInterval;
1234 }
1235
1236 auto findDeviceIndex = [&deviceIndex](size_t processorIndex, size_t timeslice) {
1237 for (auto& deviceEdge : deviceIndex) {
1238 if (deviceEdge.processorIndex != processorIndex) {
1239 continue;
1240 }
1241 if (deviceEdge.timeslice != timeslice) {
1242 continue;
1243 }
1244 return deviceEdge.deviceIndex;
1245 }
1246 throw runtime_error("Unable to find device.");
1247 };
1248
1249 // Optimize the topology when two devices are
1250 // running on the same node.
1251 if (optimizeTopology) {
1252 for (auto& connection : connections) {
1253 auto& device1 = devices[findDeviceIndex(connection.consumer, connection.timeIndex)];
1254 auto& device2 = devices[findDeviceIndex(connection.producer, connection.producerTimeIndex)];
1255 // No need to do anything if they are not on the same host
1256 if (device1.resource.hostname != device2.resource.hostname) {
1257 continue;
1258 }
1259 for (auto& input : device1.inputChannels) {
1260 for (auto& output : device2.outputChannels) {
1261 if (input.hostname == output.hostname && input.port == output.port) {
1262 input.protocol = ChannelProtocol::IPC;
1263 output.protocol = ChannelProtocol::IPC;
1264 input.hostname += uniqueWorkflowId;
1265 output.hostname += uniqueWorkflowId;
1266 }
1267 }
1268 }
1269 }
1270 }
1271}
1272
1273void DeviceSpecHelpers::reworkHomogeneousOption(std::vector<DataProcessorInfo>& infos, char const* name, char const* defaultValue)
1274{
1275 std::string finalValue;
1276 for (auto& info : infos) {
1277 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(), name);
1278 if (it == info.cmdLineArgs.end()) {
1279 continue;
1280 }
1281 auto value = it + 1;
1282 if (value == info.cmdLineArgs.end()) {
1283 throw runtime_error_f("%s requires an argument", name);
1284 }
1285 if (!finalValue.empty() && finalValue != *value) {
1286 throw runtime_error_f("Found incompatible %s values: %s amd %s", name, finalValue.c_str(), value->c_str());
1287 }
1288 finalValue = *value;
1289 info.cmdLineArgs.erase(it, it + 2);
1290 }
1291 if (finalValue.empty() && defaultValue == nullptr) {
1292 return;
1293 }
1294 if (finalValue.empty()) {
1295 finalValue = defaultValue;
1296 }
1297 for (auto& info : infos) {
1298 info.cmdLineArgs.emplace_back(name);
1299 info.cmdLineArgs.push_back(finalValue);
1300 }
1301}
1302
1303void DeviceSpecHelpers::reworkIntegerOption(std::vector<DataProcessorInfo>& infos, char const* name, std::function<long long()> defaultValueCallback, long long startValue, std::function<long long(long long, long long)> bestValue)
1304{
1305 int64_t finalValue = startValue;
1306 bool wasModified = false;
1307 for (auto& info : infos) {
1308 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(), name);
1309 if (it == info.cmdLineArgs.end()) {
1310 continue;
1311 }
1312 auto valueS = it + 1;
1313 if (valueS == info.cmdLineArgs.end()) {
1314 throw runtime_error_f("%s requires an integer argument", name);
1315 }
1316 char* err = nullptr;
1317 long long value = strtoll(valueS->c_str(), &err, 10);
1318 finalValue = bestValue(value, finalValue);
1319 wasModified = true;
1320 info.cmdLineArgs.erase(it, it + 2);
1321 }
1322 if (!wasModified && defaultValueCallback == nullptr) {
1323 return;
1324 }
1325 if (!wasModified) {
1326 finalValue = defaultValueCallback();
1327 }
1328 for (auto& info : infos) {
1329 info.cmdLineArgs.emplace_back(name);
1330 info.cmdLineArgs.push_back(std::to_string(finalValue));
1331 }
1332}
1333
1334void DeviceSpecHelpers::reworkShmSegmentSize(std::vector<DataProcessorInfo>& infos)
1335{
1336 int64_t segmentSize = 0;
1337 for (auto& info : infos) {
1338 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(), "--shm-segment-size");
1339 if (it == info.cmdLineArgs.end()) {
1340 continue;
1341 }
1342 auto value = it + 1;
1343 if (value == info.cmdLineArgs.end()) {
1344 throw runtime_error("--shm-segment-size requires an argument");
1345 }
1346 char* err = nullptr;
1347 int64_t size = strtoll(value->c_str(), &err, 10);
1348 if (size > segmentSize) {
1349 segmentSize = size;
1350 }
1351 info.cmdLineArgs.erase(it, it + 2);
1352 }
1354 if (segmentSize == 0) {
1355 struct rlimit limits;
1356 getrlimit(RLIMIT_AS, &limits);
1357 if (limits.rlim_cur != RLIM_INFINITY) {
1358 segmentSize = std::min(limits.rlim_cur - 1000000000LL, (limits.rlim_cur * 90LL) / 100LL);
1359 }
1360 }
1361 if (segmentSize == 0) {
1362 segmentSize = 2000000000LL;
1363 }
1364 for (auto& info : infos) {
1365 info.cmdLineArgs.emplace_back("--shm-segment-size");
1366 info.cmdLineArgs.push_back(std::to_string(segmentSize));
1367 }
1368}
1369
1370namespace
1371{
1372template <class Container>
1373void split(const std::string& str, Container& cont)
1374{
1375 std::istringstream iss(str);
1376 std::copy(std::istream_iterator<std::string>(iss),
1377 std::istream_iterator<std::string>(),
1378 std::back_inserter(cont));
1379}
1380} // namespace
1381
1382void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, bool interactive,
1383 unsigned short driverPort,
1384 o2::framework::DriverConfig const& driverConfig,
1385 std::vector<DataProcessorInfo> const& processorInfos,
1386 std::vector<DeviceSpec> const& deviceSpecs,
1387 std::vector<DeviceExecution>& deviceExecutions,
1388 std::vector<DeviceControl>& deviceControls,
1389 std::vector<ConfigParamSpec> const& detectedOptions,
1390 std::string const& uniqueWorkflowId)
1391{
1392 assert(deviceSpecs.size() == deviceExecutions.size());
1393 assert(deviceControls.size() == deviceExecutions.size());
1394 for (size_t si = 0; si < deviceSpecs.size(); ++si) {
1395 auto& spec = deviceSpecs[si];
1396 O2_SIGNPOST_ID_GENERATE(poid, device_spec_helpers);
1397 O2_SIGNPOST_START(device_spec_helpers, poid, "prepareArguments", "Preparing options for %{public}s", spec.id.c_str());
1398 auto& control = deviceControls[si];
1399 auto& execution = deviceExecutions[si];
1400
1401 control.quiet = defaultQuiet;
1402 control.stopped = defaultStopped;
1403
1404 int argc;
1405 char** argv;
1406 // We need to start with the detected options, so that they are not lost.
1407 // Notice how detected options can be detected at any moment in the chain,
1408 // so it's important that if you rely on them, they get passed on
1409 // always.
1410 std::vector<ConfigParamSpec> workflowOptions = detectedOptions;
1411 for (auto& opt : detectedOptions) {
1412 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid, "prepareArguments", "Processor option %{public}s passed as previously detected", opt.name.c_str());
1413 }
1417 auto pi = std::find_if(processorInfos.begin(), processorInfos.end(), [&](auto const& x) { return x.name == spec.id; });
1418 argc = pi->cmdLineArgs.size() + 1;
1419 argv = (char**)malloc(sizeof(char**) * (argc + 1));
1420 argv[0] = strdup(pi->executable.data());
1421 for (size_t ai = 0; ai < pi->cmdLineArgs.size(); ++ai) {
1422 auto const& arg = pi->cmdLineArgs[ai];
1423 argv[ai + 1] = strdup(arg.data());
1424 }
1425 argv[argc] = nullptr;
1426 for (auto& opt : pi->workflowOptions) {
1427 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid, "prepareArguments", "Processor option %{public}s found in process description", opt.name.c_str());
1428 workflowOptions.push_back(opt);
1429 }
1430 std::sort(workflowOptions.begin(), workflowOptions.end(), [](ConfigParamSpec const& a, ConfigParamSpec const& b) { return a.name < b.name; });
1431 auto last = std::unique(workflowOptions.begin(), workflowOptions.end());
1432 workflowOptions.erase(last, workflowOptions.end());
1433
1434 for (auto& opt : workflowOptions) {
1435 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid, "prepareArguments", "Final unique option %{public}s added to list of workflowOptions", opt.name.c_str());
1436 }
1437 // We duplicate the list of options, filtering only those
1438 // which are actually relevant for the given device. The additional
1439 // four are to add
1440 // * name of the executable
1441 // * --framework-id <id> so that we can use the workflow
1442 // executable also in other context where we do not fork, e.g. DDS.
1443 // * final NULL required by execvp
1444 //
1445 // We do it here because we are still in the parent and we can therefore
1446 // capture them to be displayed in the GUI or to populate the DDS configuration
1447 // to dump
1448
1449 // Set up options for the device running underneath
1450 // FIXME: add some checksum in framework id. We could use this
1451 // to avoid redeploys when only a portion of the workflow is changed.
1452 // FIXME: this should probably be done in one go with char *, but I am lazy.
1453 std::vector<std::string> tmpArgs = {argv[0],
1454 "--id", spec.id.c_str(),
1455 "--control", interactive ? "gui" : "static",
1456 "--shm-monitor", "false",
1457 "--log-color", "false",
1458 driverConfig.batch ? "--batch" : "--no-batch",
1459 "--color", "false"};
1460
1461 // we maintain options in a map so that later occurrences of the same
1462 // option will overwrite the value. To make unit tests work on all platforms,
1463 // we need to make the sequence deterministic and store it in a separate vector
1464 std::vector<std::string> deviceOptionsSequence;
1465 std::unordered_map<std::string, std::string> uniqueDeviceArgs;
1466 auto updateDeviceArguments = [&deviceOptionsSequence, &uniqueDeviceArgs](auto key, auto value) {
1467 if (uniqueDeviceArgs.find(key) == uniqueDeviceArgs.end()) {
1468 // not yet existing, we add the key to the sequence
1469 deviceOptionsSequence.emplace_back(key);
1470 }
1471 uniqueDeviceArgs[key] = value;
1472 };
1473 std::vector<std::string> tmpEnv;
1474 if (defaultStopped) {
1475 tmpArgs.emplace_back("-s");
1476 }
1477
1478 // do the filtering of options:
1479 // 1) forward options belonging to this specific DeviceSpec
1480 // 2) global options defined in getForwardedDeviceOptions and workflow option are
1481 // always forwarded and need to be handled separately
1482 const char* name = spec.name.c_str();
1483 bpo::options_description od; // option descriptions per process
1484 bpo::options_description foDesc; // forwarded options for all processes
1485 ConfigParamsHelper::dpl2BoostOptions(spec.options, od);
1486 od.add_options()(name, bpo::value<std::string>());
1487 ConfigParamsHelper::dpl2BoostOptions(workflowOptions, foDesc);
1488 auto forwardedOptions = getForwardedDeviceOptions();
1490 foDesc.add(forwardedOptions);
1491
1492 // has option --session been specified on the command line?
1493 bool haveSessionArg = false;
1494 using FilterFunctionT = std::function<void(decltype(argc), decltype(argv), decltype(od))>;
1495 bool useDefaultWS = true;
1496
1497 // the filter function will forward command line arguments based on the option
1498 // definition passed to it. All options of the program option definition will be forwarded
1499 // if found in the argument list. If not found they will be added with the default value
1500 FilterFunctionT filterArgsFct = [&](int largc, char** largv, const bpo::options_description& odesc) {
1501 // spec contains options
1502 using namespace bpo::command_line_style;
1503 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
1504
1505 bpo::command_line_parser parser{largc, largv};
1506 parser.options(odesc).allow_unregistered();
1507 parser.style(style);
1508 bpo::parsed_options parsed_options = parser.run();
1509
1510 bpo::variables_map varmap;
1511 bpo::store(parsed_options, varmap);
1512 if (varmap.count("environment")) {
1513 auto environment = varmap["environment"].as<std::string>();
1514 split(environment, tmpEnv);
1515 }
1516
1518 if (varmap.count("stacktrace-on-signal") && varmap["stacktrace-on-signal"].as<std::string>() != "none" && varmap["stacktrace-on-signal"].as<std::string>() != "simple") {
1519 char const* preload = getenv("LD_PRELOAD");
1520 if (preload == nullptr || strcmp(preload, "libSegFault.so") == 0) {
1521 tmpEnv.emplace_back("LD_PRELOAD=libSegFault.so");
1522 } else {
1523 tmpEnv.push_back(fmt::format("LD_PRELOAD={}:libSegFault.so", preload));
1524 }
1525 tmpEnv.push_back(fmt::format("SEGFAULT_SIGNALS={}", varmap["stacktrace-on-signal"].as<std::string>()));
1526 }
1527
1528 // options can be grouped per processor spec, the group is entered by
1529 // the option created from the actual processor spec name
1530 // if specified, the following string is interpreted as a sequence
1531 // of arguments
1532 if (varmap.count(name) > 0) {
1533 // strangely enough, the first argument of the group argument string
1534 // is marked as defaulted by the parser and is thus ignored. not fully
1535 // understood but adding a dummy argument in front cures this
1536 auto arguments = "--unused " + varmap[name].as<std::string>();
1537 wordexp_t expansions;
1538 wordexp(arguments.c_str(), &expansions, 0);
1539 bpo::options_description realOdesc = odesc;
1540 realOdesc.add_options()("severity", bpo::value<std::string>());
1541 realOdesc.add_options()("child-driver", bpo::value<std::string>());
1542 realOdesc.add_options()("rate", bpo::value<std::string>());
1543 realOdesc.add_options()("exit-transition-timeout", bpo::value<std::string>());
1544 realOdesc.add_options()("data-processing-timeout", bpo::value<std::string>());
1545 realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
1546 realOdesc.add_options()("timeframes-rate-limit", bpo::value<std::string>());
1547 realOdesc.add_options()("environment", bpo::value<std::string>());
1548 realOdesc.add_options()("stacktrace-on-signal", bpo::value<std::string>());
1549 realOdesc.add_options()("post-fork-command", bpo::value<std::string>());
1550 realOdesc.add_options()("bad-alloc-max-attempts", bpo::value<std::string>());
1551 realOdesc.add_options()("bad-alloc-attempt-interval", bpo::value<std::string>());
1552 realOdesc.add_options()("io-threads", bpo::value<std::string>());
1553 realOdesc.add_options()("shm-segment-size", bpo::value<std::string>());
1554 realOdesc.add_options()("shm-mlock-segment", bpo::value<std::string>());
1555 realOdesc.add_options()("shm-mlock-segment-on-creation", bpo::value<std::string>());
1556 realOdesc.add_options()("shm-zero-segment", bpo::value<std::string>());
1557 realOdesc.add_options()("shm-throw-bad-alloc", bpo::value<std::string>());
1558 realOdesc.add_options()("shm-segment-id", bpo::value<std::string>());
1559 realOdesc.add_options()("shm-allocation", bpo::value<std::string>());
1560 realOdesc.add_options()("shm-no-cleanup", bpo::value<std::string>());
1561 realOdesc.add_options()("shmid", bpo::value<std::string>());
1562 realOdesc.add_options()("shm-metadata-msg-size", bpo::value<std::string>()->default_value("0"));
1563 realOdesc.add_options()("shm-monitor", bpo::value<std::string>());
1564 realOdesc.add_options()("channel-prefix", bpo::value<std::string>());
1565 realOdesc.add_options()("network-interface", bpo::value<std::string>());
1566 realOdesc.add_options()("early-forward-policy", bpo::value<std::string>());
1567 realOdesc.add_options()("session", bpo::value<std::string>());
1568 realOdesc.add_options()("signposts", bpo::value<std::string>());
1569 filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc);
1570 wordfree(&expansions);
1571 return;
1572 }
1573
1574 const char* child_driver_key = "child-driver";
1575 if (varmap.count(child_driver_key) > 0) {
1576 auto arguments = varmap[child_driver_key].as<std::string>();
1577 wordexp_t expansions;
1578 wordexp(arguments.c_str(), &expansions, 0);
1579 tmpArgs.insert(tmpArgs.begin(), expansions.we_wordv, expansions.we_wordv + expansions.we_wordc);
1580 }
1581
1582 haveSessionArg = haveSessionArg || varmap.count("session") != 0;
1583 useDefaultWS = useDefaultWS && ((varmap.count("driver-client-backend") == 0) || varmap["driver-client-backend"].as<std::string>() == "ws://");
1584
1585 auto processRawChannelConfig = [&tmpArgs, &spec](const std::string& conf) {
1586 std::stringstream ss(reworkTimeslicePlaceholder(conf, spec));
1587 std::string token;
1588 while (std::getline(ss, token, ';')) { // split to tokens, trim spaces and add each non-empty one with channel-config options
1589 token.erase(token.begin(), std::find_if(token.begin(), token.end(), [](int ch) { return !std::isspace(ch); }));
1590 token.erase(std::find_if(token.rbegin(), token.rend(), [](int ch) { return !std::isspace(ch); }).base(), token.end());
1591 if (!token.empty()) {
1592 tmpArgs.emplace_back("--channel-config");
1593 tmpArgs.emplace_back(token);
1594 }
1595 }
1596 };
1597
1598 for (const auto varit : varmap) {
1599 // find the option belonging to key, add if the option has been parsed
1600 // and is not defaulted
1601 const auto* description = odesc.find_nothrow(varit.first, false);
1602 if (description && varmap.count(varit.first)) {
1603 // check the semantics of the value
1604 auto semantic = description->semantic();
1605 const char* optarg = "";
1606 if (semantic) {
1607 // the value semantics allows different properties like
1608 // multitoken, zero_token and composing
1609 // currently only the simple case is supported
1610 assert(semantic->min_tokens() <= 1);
1611 // assert(semantic->max_tokens() && semantic->min_tokens());
1612 if (semantic->min_tokens() > 0) {
1613 std::string stringRep;
1614 if (auto v = boost::any_cast<std::string>(&varit.second.value())) {
1615 stringRep = *v;
1616 } else if (auto v = boost::any_cast<EarlyForwardPolicy>(&varit.second.value())) {
1617 std::stringstream tmp;
1618 tmp << *v;
1619 stringRep = fmt::format("{}", tmp.str());
1620 }
1621 if (varit.first == "channel-config") {
1622 // FIXME: the parameter to channel-config can be a list of configurations separated
1623 // by semicolon. The individual configurations will be separated and added individually.
1624 // The device arguments can then contaoin multiple channel-config entries, but only
1625 // one for the last configuration is added to control.options
1626 processRawChannelConfig(stringRep);
1627 optarg = tmpArgs.back().c_str();
1628 } else {
1629 std::string key(fmt::format("--{}", varit.first));
1630 if (stringRep.length() == 0) {
1631 // in order to identify options without parameter we add a string
1632 // with one blank for the 'blank' parameter, it is filtered out
1633 // further down and a zero-length string is added to argument list
1634 stringRep = " ";
1635 }
1636 updateDeviceArguments(key, stringRep);
1637 optarg = uniqueDeviceArgs[key].c_str();
1638 }
1639 } else if (semantic->min_tokens() == 0 && varit.second.as<bool>()) {
1640 updateDeviceArguments(fmt::format("--{}", varit.first), "");
1641 }
1642 }
1643 control.options.insert(std::make_pair(varit.first, optarg));
1644 }
1645 }
1646 };
1647
1648 // filter global options and workflow options independent of option groups
1649 filterArgsFct(argc, argv, foDesc);
1650 // filter device options, and handle option groups
1651 filterArgsFct(argc, argv, od);
1652
1653 // Add the channel configuration
1654 for (auto& channel : spec.outputChannels) {
1655 tmpArgs.emplace_back(std::string("--channel-config"));
1656 tmpArgs.emplace_back(outputChannel2String(channel));
1657 }
1658 for (auto& channel : spec.inputChannels) {
1659 tmpArgs.emplace_back(std::string("--channel-config"));
1660 tmpArgs.emplace_back(inputChannel2String(channel));
1661 }
1662
1663 // add the session id if not already specified on command line
1664 if (!haveSessionArg) {
1665 updateDeviceArguments(std::string("--session"), "dpl_" + uniqueWorkflowId);
1666 }
1667 // In case we use only ws://, we need to expand the address
1668 // with the correct port.
1669 if (useDefaultWS) {
1670 updateDeviceArguments(std::string("--driver-client-backend"), "ws://0.0.0.0:" + std::to_string(driverPort));
1671 }
1672
1673 if (spec.resourceMonitoringInterval > 0) {
1674 updateDeviceArguments(std::string("--resources-monitoring"), std::to_string(spec.resourceMonitoringInterval));
1675 }
1676
1677 // We create the final option list, depending on the channels
1678 // which are present in a device.
1679 for (auto& arg : tmpArgs) {
1680 execution.args.emplace_back(strdup(arg.c_str()));
1681 }
1682 for (auto& key : deviceOptionsSequence) {
1683 execution.args.emplace_back(strdup(key.c_str()));
1684 std::string const& value = uniqueDeviceArgs[key];
1685 if (value.empty()) {
1686 // this option does not have a parameter
1687 continue;
1688 } else if (value == " ") {
1689 // this was a placeholder for zero-length parameter string in order
1690 // to separate this from options without parameter
1691 execution.args.emplace_back(strdup(""));
1692 } else {
1693 execution.args.emplace_back(strdup(value.c_str()));
1694 }
1695 }
1696 // execvp wants a NULL terminated list.
1697 execution.args.push_back(nullptr);
1698
1699 for (auto& env : tmpEnv) {
1700 execution.environ.emplace_back(strdup(env.c_str()));
1701 }
1702
1703 // FIXME: this should probably be reflected in the GUI
1704 std::ostringstream str;
1705 for (size_t ai = 0; ai < execution.args.size() - 1; ai++) {
1706 if (execution.args[ai] == nullptr) {
1707 LOG(error) << "Bad argument for " << execution.args[ai - 1];
1708 }
1709 assert(execution.args[ai]);
1710 str << " " << execution.args[ai];
1711 }
1712 O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s",
1713 spec.id.c_str(), str.str().c_str());
1714 }
1715}
1716
1718boost::program_options::options_description DeviceSpecHelpers::getForwardedDeviceOptions()
1719{
1720 // - rate is an option of FairMQ device for ConditionalRun
1721 // - child-driver is not a FairMQ device option but used per device to start to process
1722 bpo::options_description forwardedDeviceOptions;
1723 char const* defaultSignposts = getenv("DPL_SIGNPOSTS") ? getenv("DPL_SIGNPOSTS") : "";
1724 forwardedDeviceOptions.add_options() //
1725 ("severity", bpo::value<std::string>()->default_value("info"), "severity level of the log") //
1726 ("plugin,P", bpo::value<std::string>(), "FairMQ plugin list") //
1727 ("plugin-search-path,S", bpo::value<std::string>(), "FairMQ plugins search path") //
1728 ("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
1729 ("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
1730 ("exit-transition-timeout", bpo::value<std::string>(), "timeout before switching to READY state") //
1731 ("data-processing-timeout", bpo::value<std::string>(), "timeout after which only calibration can happen") //
1732 ("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
1733 ("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
1734 ("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
1735 ("channel-prefix", bpo::value<std::string>()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") //
1736 ("bad-alloc-max-attempts", bpo::value<std::string>()->default_value("1"), "throw after n attempts to alloc shm") //
1737 ("bad-alloc-attempt-interval", bpo::value<std::string>()->default_value("50"), "interval between shm alloc attempts in ms") //
1738 ("io-threads", bpo::value<std::string>()->default_value("1"), "number of FMQ io threads") //
1739 ("shm-segment-size", bpo::value<std::string>(), "size of the shared memory segment in bytes") //
1740 ("shm-mlock-segment", bpo::value<std::string>()->default_value("false"), "mlock shared memory segment") //
1741 ("shm-mlock-segment-on-creation", bpo::value<std::string>()->default_value("false"), "mlock shared memory segment once on creation") //
1742 ("shm-zero-segment", bpo::value<std::string>()->default_value("false"), "zero shared memory segment") //
1743 ("shm-throw-bad-alloc", bpo::value<std::string>()->default_value("true"), "throw if insufficient shm memory") //
1744 ("shm-segment-id", bpo::value<std::string>()->default_value("0"), "shm segment id") //
1745 ("shm-allocation", bpo::value<std::string>()->default_value("rbtree_best_fit"), "shm allocation method") //
1746 ("shm-no-cleanup", bpo::value<std::string>()->default_value("false"), "no shm cleanup") //
1747 ("shmid", bpo::value<std::string>(), "shmid") //
1748 ("shm-metadata-msg-size", bpo::value<std::string>()->default_value("0"), "numeric value in B used for padding FairMQ header, see FairMQ v.1.6.0") //
1749 ("environment", bpo::value<std::string>(), "comma separated list of environment variables to set for the device") //
1750 ("stacktrace-on-signal", bpo::value<std::string>()->default_value("simple"), //
1751 "dump stacktrace on specified signal(s) (any of `all`, `segv`, `bus`, `ill`, `abrt`, `fpe`, `sys`.)" //
1752 "Use `simple` to dump only the main thread in a reliable way") //
1753 ("post-fork-command", bpo::value<std::string>(), "post fork command to execute (e.g. numactl {pid}") //
1754 ("session", bpo::value<std::string>(), "unique label for the shared memory session") //
1755 ("network-interface", bpo::value<std::string>(), "network interface to which to bind tpc fmq ports without specified address") //
1756 ("early-forward-policy", bpo::value<EarlyForwardPolicy>()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") //
1757 ("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
1758 ("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
1759 ("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
1760 ("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //
1761 ("infologger-mode", bpo::value<std::string>(), "O2_INFOLOGGER_MODE override") //
1762 ("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //
1763 ("dpl-tracing-flags", bpo::value<std::string>(), "pipe separated list of events to trace") //
1764 ("signposts", bpo::value<std::string>()->default_value(defaultSignposts), //
1765 "comma separated list of signposts to enable (any of `completion`, `data_processor_context`, `stream_context`, `device`, `monitoring_service`)") //
1766 ("child-driver", bpo::value<std::string>(), "external driver to start childs with (e.g. valgrind)"); //
1767
1768 return forwardedDeviceOptions;
1769}
1770
1771bool DeviceSpecHelpers::hasLabel(DeviceSpec const& spec, char const* label)
1772{
1773 auto sameLabel = [other = DataProcessorLabel{{label}}](DataProcessorLabel const& label) { return label == other; };
1774 return std::find_if(spec.labels.begin(), spec.labels.end(), sameLabel) != spec.labels.end();
1775}
1776
1777std::string DeviceSpecHelpers::reworkTimeslicePlaceholder(std::string const& str, DeviceSpec const& spec)
1778{
1779 // find all the possible timeslice variables, extract N and replace
1780 // the variable with the value of spec.inputTimesliceId + N.
1781 std::regex re("\\{timeslice([0-9]+)\\}");
1782 std::smatch match;
1783 std::string fmt = str;
1784 while (std::regex_search(fmt, match, re)) {
1785 auto timeslice = std::stoi(match[1]);
1786 auto replacement = std::to_string(spec.inputTimesliceId + timeslice);
1787 fmt = match.prefix().str() + replacement + match.suffix().str();
1788 }
1789 return fmt;
1790}
1791
1792} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_poll_s uv_poll_t
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint16_t pid
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
std::ostringstream debug
StringRef key
virtual void notifyAcceptedOffer(ComputingOffer const &)=0
virtual std::vector< ComputingOffer > getAvailableOffers()=0
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
bpo::variables_map arguments
auto timer_fired(uv_timer_t *timer)
void timer_callback(uv_timer_t *handle)
void signal_callback(uv_signal_t *handle, int)
auto timer_set_period(uv_timer_t *timer)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
std::vector< OverrideServiceSpec > OverrideServiceSpecs
std::vector< DataProcessorSpec > WorkflowSpec
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::vector< std::string > split(const std::string &str, char delimiter=',')
static char const * typeAsString(enum ChannelType type)
return a ChannelType as a lowercase string
static std::string channelUrl(InputChannelSpec const &)
static char const * methodAsString(enum ChannelMethod method)
return a ChannelMethod as a lowercase string
A computing resource which can be offered to run a device.
static bool dpl2BoostOptions(const std::vector< ConfigParamSpec > &spec, options_description &options, boost::program_options::options_description vetos=options_description())
A label that can be associated to a DataProcessorSpec.
static std::optional< ConcreteDataMatcher > asOptionalConcreteDataMatcher(OutputSpec const &spec)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void processOutEdgeActions(ConfigContext const &configContext, std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< OutputSpec > &outputs, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ForwardingPolicy > const &forwardingPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void processInEdgeActions(std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, const std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &inEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< LogicalForwardInfo > &availableForwardsInfo, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void validate(WorkflowSpec const &workflow)
static boost::program_options::options_description getForwardedDeviceOptions()
define the options which are forwarded to every child
static std::string inputChannel2String(const InputChannelSpec &channel)
Helper to provide the channel configuration string for an input channel.
static std::string reworkTimeslicePlaceholder(std::string const &str, DeviceSpec const &spec)
static void prepareArguments(bool defaultQuiet, bool defaultStopped, bool intereactive, unsigned short driverPort, DriverConfig const &driverConfig, std::vector< DataProcessorInfo > const &processorInfos, std::vector< DeviceSpec > const &deviceSpecs, std::vector< DeviceExecution > &deviceExecutions, std::vector< DeviceControl > &deviceControls, std::vector< ConfigParamSpec > const &detectedOptions, std::string const &uniqueWorkflowId)
static void reworkShmSegmentSize(std::vector< DataProcessorInfo > &infos)
static void reworkHomogeneousOption(std::vector< DataProcessorInfo > &infos, char const *name, char const *defaultValue)
static void reworkIntegerOption(std::vector< DataProcessorInfo > &infos, char const *name, std::function< long long()> defaultValueCallback, long long startValue, std::function< long long(long long, long long)> bestValue)
static bool hasLabel(DeviceSpec const &spec, char const *label)
static std::string outputChannel2String(const OutputChannelSpec &channel)
Helper to provide the channel configuration string for an output channel.
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
std::vector< DataProcessorLabel > labels
Definition DeviceSpec.h:81
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
bool batch
Whether the driver was started in batch mode or not.
static RouteConfigurator::DanglingConfigurator danglingTimerConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingConditionConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringConditionConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::ExpirationConfigurator expiringOutOfBandConfigurator(InputSpec const &spec)
static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const &matcher, size_t inputTimeslice, size_t maxInputTimeslices)
static RouteConfigurator::ExpirationConfigurator expiringTimeframeConfigurator()
static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const &matcher, size_t inputTimeslice, size_t maxInputTimeslices)
static RouteConfigurator::ExpirationConfigurator expiringOptionalConfigurator(InputSpec const &spec, std::string const &sourceChannel)
When the record expires, simply create a dummy entry.
static RouteConfigurator::DanglingConfigurator danglingEnumerationConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingQAConfigurator()
static RouteConfigurator::CreationConfigurator createOptionalConfigurator()
This behaves as data. I.e. we never create it unless data arrives.
static RouteConfigurator::ExpirationConfigurator expiringOOBConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::CreationConfigurator loopEventDrivenConfigurator(InputSpec const &matcher)
static RouteConfigurator::ExpirationConfigurator expiringTimerConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::DanglingConfigurator danglingTimeframeConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringTransientConfigurator(InputSpec const &)
static RouteConfigurator::CreationConfigurator oobDrivenConfigurator()
static RouteConfigurator::CreationConfigurator timeDrivenConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingOptionalConfigurator(std::vector< InputRoute > const &routes)
This will always exipire an optional record when no data is received.
static RouteConfigurator::DanglingConfigurator danglingOutOfBandConfigurator()
static RouteConfigurator::CreationConfigurator dataDrivenConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringQAConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringEnumerationConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::DanglingConfigurator danglingTransientConfigurator()
static RouteConfigurator::CreationConfigurator fairmqDrivenConfiguration(InputSpec const &spec, int inputTimeslice, int maxInputTimeslices)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
std::vector< ConfigParamSpec > metadata
A set of configurables which can be used to customise the InputSpec.
Definition InputSpec.h:76
std::variant< ConcreteDataMatcher, data_matcher::DataDescriptorMatcher > matcher
The actual matcher for the input spec.
Definition InputSpec.h:70
static ExpirationHandler::Handler fetchFromCCDBCache(InputSpec const &spec, std::string const &prefix, std::string const &overrideTimestamp, std::string const &sourceChannel)
static ExpirationHandler::Handler dummy(ConcreteDataMatcher const &spec, std::string const &sourceChannel)
Create a dummy message with the provided ConcreteDataMatcher.
static ExpirationHandler::Handler fetchFromQARegistry()
static ExpirationHandler::Creator enumDrivenCreation(size_t first, size_t last, size_t step, size_t inputTimeslice, size_t maxTimeSliceId, size_t repetitions)
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const &spec, std::string const &sourceChannel, int64_t orbitOffset, int64_t orbitMultiplier)
Enumerate entries on every invokation.
static ExpirationHandler::Creator timeDrivenCreation(std::vector< std::chrono::microseconds > periods, std::vector< std::chrono::seconds > intervals, std::function< bool(void)> hasTimerFired, std::function< void(uint64_t, uint64_t)> updateTimerPeriod)
static ExpirationHandler::Creator dataDrivenCreation()
Callback which does nothing, waiting for data to arrive.
static ExpirationHandler::Checker expireAlways()
static ExpirationHandler::Handler fetchFromObjectRegistry()
static ExpirationHandler::Checker expectCTP(std::string const &serverUrl, bool waitForCTP)
static ExpirationHandler::Checker expireNever()
static ExpirationHandler::Creator uvDrivenCreation(int loopReason, DeviceState &state)
Callback which creates a new timeslice whenever some libuv event happens.
static ExpirationHandler::Handler fetchFromFairMQ(InputSpec const &spec, std::string const &channelName)
static ExpirationHandler::Handler doNothing()
static ExpirationHandler::Checker expireIfPresent(std::vector< InputRoute > const &schema, ConcreteDataMatcher matcher)
std::function< ExpirationHandler::Creator(DeviceState &, ServiceRegistryRef, ConfigParamRegistry const &)> CreationConfigurator
Definition InputRoute.h:31
std::function< ExpirationHandler::Handler(DeviceState &, ConfigParamRegistry const &)> ExpirationConfigurator
Definition InputRoute.h:33
std::function< ExpirationHandler::Checker(DeviceState &, ConfigParamRegistry const &)> DanglingConfigurator
Definition InputRoute.h:32
static ServiceSpecs filterDisabled(ServiceSpecs originals, OverrideServiceSpecs const &overrides)
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels
const std::string str