15#include <boost/program_options.hpp>
19#include <unordered_set>
44#include <fmt/format.h>
47#include <sys/resource.h>
49#include <fairmq/Device.h>
70 if (std::find(
state->firedTimers.begin(),
state->firedTimers.end(), handle) ==
state->firedTimers.end()) {
71 state->firedTimers.push_back(handle);
77 return [timer]() ->
bool {
79 return std::find(
state->firedTimers.begin(),
state->firedTimers.end(), timer) !=
state->firedTimers.end();
85 return [timer](uint64_t timeout_ms, uint64_t repeat_ms) ->
void {
112 std::vector<std::chrono::microseconds> periods;
114 std::vector<std::chrono::seconds> durations;
115 auto prefix = std::string{
"period-"};
116 for (
auto& meta : matcher.
metadata) {
117 if (strncmp(meta.name.c_str(), prefix.c_str(), prefix.size()) == 0) {
119 std::string_view duration(meta.name.c_str() + prefix.size(), meta.name.size() - prefix.size());
120 durations.emplace_back(std::chrono::seconds(std::stoi(std::string(duration))));
121 periods.emplace_back(std::chrono::microseconds(meta.defaultValue.get<uint64_t>() / 1000));
124 if (periods.empty()) {
125 std::string defaultRateName = std::string{
"period-"} + matcher.
binding;
126 auto defaultRate = std::chrono::milliseconds(options.get<
int>(defaultRateName.c_str()));
128 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, tid,
"timeDrivenCreation",
"Using default rate of %" PRIi64
" ms as specified by option period-%{public}s", defaultRate.count(),
130 periods.emplace_back(defaultRate.count());
131 durations.emplace_back(std::chrono::seconds((std::size_t)-1));
134 durations.back() = std::chrono::seconds((std::size_t)-1);
140 timer->data = &
state;
141 uv_timer_init(
state.loop, timer);
143 state.activeTimers.push_back(timer);
159 std::string startName = std::string{
"start-value-"} + matcher.
binding;
160 std::string endName = std::string{
"end-value-"} + matcher.
binding;
161 std::string stepName = std::string{
"step-value-"} + matcher.
binding;
162 auto start = options.get<int64_t>(startName.c_str());
163 auto stop = options.get<int64_t>(endName.c_str());
164 auto step = options.get<int64_t>(stepName.c_str());
169 uv_signal_init(
state.loop, sh);
172 state.activeSignals.push_back(sh);
188 std::string startName = std::string{
"start-value-"} + matcher.
binding;
189 std::string endName = std::string{
"end-value-"} + matcher.
binding;
190 std::string stepName = std::string{
"step-value-"} + matcher.
binding;
191 int64_t defaultStart = 0;
192 int64_t defaultStop = std::numeric_limits<int64_t>::max();
193 int64_t defaultStep = 1;
194 int defaultRepetitions = 1;
195 for (
auto& meta : matcher.
metadata) {
196 if (meta.name ==
"repetitions") {
197 defaultRepetitions = meta.defaultValue.get<int64_t>();
198 }
else if (meta.name ==
"start-value") {
199 defaultStart = meta.defaultValue.get<int64_t>();
200 }
else if (meta.name ==
"end-value") {
201 defaultStop = meta.defaultValue.get<int64_t>();
202 }
else if (meta.name ==
"step-value") {
203 defaultStep = meta.defaultValue.get<int64_t>();
206 auto start = options.hasOption(startName.c_str()) ? options.get<int64_t>(startName.c_str()) : defaultStart;
207 auto stop = options.hasOption(endName.c_str()) ? options.get<int64_t>(endName.c_str()) : defaultStop;
208 auto step = options.hasOption(stepName.c_str()) ? options.get<int64_t>(stepName.c_str()) : defaultStep;
209 auto repetitions = defaultRepetitions;
227 auto serverUrl = options.get<std::string>(
"condition-backend");
235 auto serverUrl = options.get<std::string>(
"condition-backend");
236 auto forceTimestamp = options.get<std::string>(
"condition-timestamp");
246 std::string channelName =
"upstream";
248 if (meta.name !=
"channel-name") {
251 channelName = meta.defaultValue.get<std::string>();
255 auto& channel = device->GetChannels()[channelName];
259 size_t zmq_fd_len =
sizeof(zmq_fd);
261 channel[0].GetSocket().GetOption(
"fd", &zmq_fd, &zmq_fd_len);
263 throw runtime_error_f(
"Cannot get file descriptor for channel %s", channelName.c_str());
265 LOG(
debug) <<
"Polling socket for " << channel[0].GetName();
267 state.activeOutOfBandPollers.push_back(poller);
287 std::string channelNameOption = std::string{
"out-of-band-channel-name-"} + spec.
binding;
288 auto channelName = options.get<std::string>(channelNameOption.c_str());
322 auto m = std::get_if<ConcreteDataMatcher>(&spec.
matcher);
324 throw runtime_error(
"InputSpec for Timers must be fully qualified");
335 auto m = std::get_if<ConcreteDataMatcher>(&spec.
matcher);
337 throw runtime_error(
"InputSpec for OOB must be fully qualified");
348 auto m = std::get_if<ConcreteDataMatcher>(&spec.
matcher);
350 throw runtime_error(
"InputSpec for Enumeration must be fully qualified");
354 int defaultOrbitOffset = 0;
355 int defaultOrbitMultiplier = 1;
357 if (meta.name ==
"orbit-offset") {
358 defaultOrbitOffset = meta.defaultValue.get<int64_t>();
359 }
else if (meta.name ==
"orbit-multiplier") {
360 defaultOrbitMultiplier = meta.defaultValue.get<int64_t>();
363 size_t orbitOffset = config.hasOption(
"orbit-offset-enumeration") ? config.get<int64_t>(
"orbit-offset-enumeration") : defaultOrbitOffset;
364 size_t orbitMultiplier = config.hasOption(
"orbit-multiplier-enumeration") ? config.get<int64_t>(
"orbit-multiplier-enumeration") : defaultOrbitMultiplier;
417 return fmt::format(
"{}type={},method={},address={},rateLogging={},rcvBufSize={},sndBufSize={}",
418 channel.
name.empty() ?
"" :
"name=" + channel.
name +
",",
429 return fmt::format(
"{}type={},method={},address={},rateLogging={},rcvBufSize={},sndBufSize={}",
430 channel.
name.empty() ?
"" :
"name=" + channel.
name +
",",
450 std::vector<std::pair<int, std::string>> timeframeOutputs;
451 for (
size_t i = 0;
i < workflow.size(); ++
i) {
452 auto& spec = workflow[
i];
454 if (spec.inputTimeSliceId != 0) {
457 for (
auto&
output : spec.outputs) {
458 if (
output.lifetime != Lifetime::Timeframe) {
468 std::stable_sort(timeframeOutputs.begin(), timeframeOutputs.end(), [](
auto const&
a,
auto const&
b) {
469 return a.second < b.second;
472 auto it = std::adjacent_find(timeframeOutputs.begin(), timeframeOutputs.end(), [](
auto const&
a,
auto const&
b) {
473 return a.second == b.second;
475 if (it != timeframeOutputs.end()) {
477 auto device1 = workflow[it->first].name;
478 auto device2 = workflow[(it + 1)->
first].
name;
479 auto output1 = it->second;
480 auto output2 = (it + 1)->second;
481 throw std::runtime_error(fmt::format(
"Found duplicate outputs {} in device {} ({}) and {} in {} ({})",
482 output1, device1, it->first, output2, device2, (it + 1)->first));
487 std::vector<DeviceSpec>& devices,
488 std::vector<DeviceId>& deviceIndex,
489 std::vector<DeviceConnectionId>& connections,
491 const std::vector<size_t>& outEdgeIndex,
492 const std::vector<DeviceConnectionEdge>& logicalEdges,
493 const std::vector<EdgeAction>& actions,
const WorkflowSpec& workflow,
494 const std::vector<OutputSpec>& outputsMatchers,
495 const std::vector<ChannelConfigurationPolicy>& channelPolicies,
496 const std::vector<SendingPolicy>& sendingPolicies,
497 const std::vector<ForwardingPolicy>& forwardingPolicies,
498 std::string
const& channelPrefix,
505 assert(!workflow.empty());
509 auto deviceForEdge = [&actions, &workflow, &devices,
510 &logicalEdges, &resourceManager,
511 &defaultOffer, &channelPrefix, overrideServices](
size_t ei,
ComputingOffer& acceptedOffer) {
512 auto& edge = logicalEdges[ei];
513 auto& action = actions[ei];
515 if (action.requiresNewDevice ==
false) {
516 assert(devices.empty() ==
false);
517 return devices.size() - 1;
519 if (acceptedOffer.hostname !=
"") {
523 auto& processor = workflow[edge.producer];
525 acceptedOffer.cpu = defaultOffer.
cpu;
526 acceptedOffer.memory = defaultOffer.
memory;
528 if (offer.cpu < acceptedOffer.cpu) {
531 if (offer.memory < acceptedOffer.memory) {
534 acceptedOffer.hostname = offer.hostname;
535 acceptedOffer.startPort = offer.startPort;
536 acceptedOffer.rangeSize = 0;
541 .
name = processor.name,
542 .id = processor.maxInputTimeslices == 1 ? processor.name : processor.name +
"_t" +
std::to_string(edge.producerTimeIndex),
543 .channelPrefix = channelPrefix,
545 .options = processor.options,
547 .algorithm = processor.algorithm,
548 .rank = processor.rank,
549 .nSlots = processor.nSlots,
550 .inputTimesliceId = edge.producerTimeIndex,
551 .maxInputTimeslices = processor.maxInputTimeslices,
552 .resource = {acceptedOffer},
553 .labels = processor.labels,
554 .metadata = processor.metadata});
583 for (
auto&
output : processor.outputs) {
584 if (
output.lifetime != Lifetime::OutOfBand) {
588 .
name =
"downstream",
591 .hostname =
"localhost",
594 for (
auto& meta :
output.metadata) {
595 if (meta.name ==
"channel-name") {
596 extraOutputChannelSpec.name = meta.defaultValue.get<std::string>();
598 if (meta.name ==
"port") {
599 extraOutputChannelSpec.port = meta.defaultValue.get<int32_t>();
601 if (meta.name ==
"address") {
602 extraOutputChannelSpec.hostname = meta.defaultValue.get<std::string>();
605 devices.back().outputChannels.push_back(extraOutputChannelSpec);
607 return devices.size() - 1;
610 auto channelFromDeviceEdgeAndPort = [&connections, &workflow, &channelPolicies](
const DeviceSpec& device,
615 auto& consumer = workflow[edge.consumer];
616 std::string consumerDeviceId = consumer.
name;
617 if (consumer.maxInputTimeslices != 1) {
620 channel.
name = device.channelPrefix +
"from_" + device.id +
"_to_" + consumerDeviceId;
621 channel.
port = acceptedOffer.startPort + acceptedOffer.rangeSize;
622 channel.
hostname = acceptedOffer.hostname;
623 deviceResource.usedPorts += 1;
624 acceptedOffer.rangeSize += 1;
626 for (
auto& policy : channelPolicies) {
627 if (policy.match(device.id, consumerDeviceId)) {
628 policy.modifyOutput(channel);
633 connections.push_back(
id);
635 auto&
source = workflow[edge.producer];
638 O2_SIGNPOST_START(device_spec_helpers, sid,
"new channels",
"Channel %{public}s has been created.", channel.
name.c_str());
640 O2_SIGNPOST_START(device_spec_helpers, iid,
"producer outputs",
"Producer %{public}s has the following outputs:",
source.name.c_str());
645 O2_SIGNPOST_START(device_spec_helpers, iid,
"producer forwards",
"Producer %{public}s has the following forwards:",
source.name.c_str());
646 for (
auto& forwards : device.forwards) {
650 O2_SIGNPOST_START(device_spec_helpers, iid,
"consumer inputs",
"Consumer %{public}s has the following inputs:", consumer.name.c_str());
651 for (
auto& input : consumer.inputs) {
659 auto isDifferentDestinationDeviceReferredBy = [&actions](
size_t ei) {
return actions[ei].requiresNewChannel; };
666 auto createChannelForDeviceEdge = [&devices, &logicalEdges, &channelFromDeviceEdgeAndPort,
668 auto& device = devices[
di];
669 auto& edge = logicalEdges[ei];
671 deviceIndex.emplace_back(
DeviceId{edge.producer, edge.producerTimeIndex,
di});
673 OutputChannelSpec channel = channelFromDeviceEdgeAndPort(device, device.resource, offer, edge);
675 device.outputChannels.push_back(channel);
676 return device.outputChannels.size() - 1;
683 auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges, &sendingPolicies, &forwardingPolicies, &configContext](
684 size_t ei,
size_t di,
size_t ci) {
685 assert(ei < logicalEdges.size());
686 assert(
di < devices.size());
687 assert(ci < devices[
di].outputChannels.size());
688 auto& edge = logicalEdges[ei];
689 auto& device = devices[
di];
690 assert(edge.consumer < workflow.size());
691 auto& consumer = workflow[edge.consumer];
692 auto& producer = workflow[edge.producer];
693 auto& channel = devices[
di].outputChannels[ci];
694 assert(edge.outputGlobalIndex < outputsMatchers.size());
698 for (
auto& policy : sendingPolicies) {
699 if (policy.matcher(producer, consumer, configContext)) {
704 assert(forwardingPolicies.empty() ==
false);
705 for (
auto& policy : forwardingPolicies) {
706 if (policy.matcher(producer, consumer, configContext)) {
707 forwardPolicyPtr = &policy;
711 assert(policyPtr !=
nullptr);
712 assert(forwardPolicyPtr !=
nullptr);
714 if (edge.isForward ==
false) {
717 .maxTimeslices = consumer.maxInputTimeslices,
718 .matcher = outputsMatchers[edge.outputGlobalIndex],
719 .channel = channel.name,
722 device.outputs.emplace_back(route);
726 .maxTimeslices = consumer.maxInputTimeslices,
727 .matcher = workflow[edge.consumer].inputs[edge.consumerInputIndex],
728 .channel = channel.name,
729 .policy = forwardPolicyPtr,
734 if (route.matcher.lifetime == Lifetime::Timer) {
735 route.
matcher.lifetime = Lifetime::Timeframe;
737 device.forwards.emplace_back(route);
741 auto sortDeviceIndex = [&deviceIndex]() { std::sort(deviceIndex.begin(), deviceIndex.end()); };
743 auto lastChannelFor = [&devices](
size_t di) {
744 assert(
di < devices.size());
745 assert(devices[
di].outputChannels.empty() ==
false);
746 return devices[
di].outputChannels.size() - 1;
760 for (
auto edge : outEdgeIndex) {
761 auto device = deviceForEdge(edge, acceptedOffer);
763 if (isDifferentDestinationDeviceReferredBy(edge)) {
764 channel = createChannelForDeviceEdge(device, edge, acceptedOffer);
766 channel = lastChannelFor(device);
768 appendOutputRouteToSourceDeviceChannel(edge, device, channel);
770 if (std::string(acceptedOffer.hostname) !=
"") {
777 std::vector<DeviceId>& deviceIndex,
778 const std::vector<DeviceConnectionId>& connections,
780 const std::vector<size_t>& inEdgeIndex,
781 const std::vector<DeviceConnectionEdge>& logicalEdges,
782 const std::vector<EdgeAction>& actions,
const WorkflowSpec& workflow,
783 std::vector<LogicalForwardInfo>
const& availableForwardsInfo,
784 std::vector<ChannelConfigurationPolicy>
const& channelPolicies,
785 std::string
const& channelPrefix,
789 auto const& constDeviceIndex = deviceIndex;
790 if (!std::is_sorted(constDeviceIndex.cbegin(), constDeviceIndex.cend())) {
794 auto findProducerForEdge = [&logicalEdges, &constDeviceIndex](
size_t ei) {
795 auto& edge = logicalEdges[ei];
797 DeviceId pid{edge.producer, edge.producerTimeIndex, 0};
798 auto deviceIt = std::lower_bound(constDeviceIndex.cbegin(), constDeviceIndex.cend(),
pid);
800 assert(deviceIt != constDeviceIndex.end());
801 assert(deviceIt->processorIndex ==
pid.processorIndex && deviceIt->timeslice ==
pid.timeslice);
802 return deviceIt->deviceIndex;
805 auto findConsumerForEdge = [&logicalEdges, &constDeviceIndex](
size_t ei) {
806 auto& edge = logicalEdges[ei];
809 auto deviceIt = std::lower_bound(constDeviceIndex.cbegin(), constDeviceIndex.cend(),
pid);
811 assert(deviceIt != constDeviceIndex.end());
812 assert(deviceIt->processorIndex ==
pid.processorIndex && deviceIt->timeslice ==
pid.timeslice);
813 return deviceIt->deviceIndex;
822 decltype(deviceIndex.begin()) lastConsumerSearch;
823 size_t lastConsumerSearchEdge;
824 auto hasConsumerForEdge = [&lastConsumerSearch, &lastConsumerSearchEdge, &deviceIndex,
825 &logicalEdges](
size_t ei) ->
int {
826 auto& edge = logicalEdges[ei];
827 DeviceId cid{edge.consumer, edge.timeIndex, 0};
828 lastConsumerSearchEdge = ei;
829 lastConsumerSearch = std::lower_bound(deviceIndex.begin(), deviceIndex.end(), cid);
830 return lastConsumerSearch != deviceIndex.end() && cid.processorIndex == lastConsumerSearch->processorIndex &&
831 cid.timeslice == lastConsumerSearch->timeslice;
836 auto getConsumerForEdge = [&lastConsumerSearch, &lastConsumerSearchEdge](
size_t ei) {
837 assert(ei == lastConsumerSearchEdge);
838 return lastConsumerSearch->deviceIndex;
841 auto createNewDeviceForEdge = [&workflow, &logicalEdges, &devices,
842 &deviceIndex, &resourceManager, &defaultOffer,
843 &channelPrefix, &overrideServices](
size_t ei,
ComputingOffer& acceptedOffer) {
844 auto& edge = logicalEdges[ei];
846 if (acceptedOffer.hostname !=
"") {
850 auto& processor = workflow[edge.consumer];
852 acceptedOffer.cpu = defaultOffer.
cpu;
853 acceptedOffer.memory = defaultOffer.
memory;
855 if (offer.cpu < acceptedOffer.cpu) {
858 if (offer.memory < acceptedOffer.memory) {
861 acceptedOffer.hostname = offer.hostname;
862 acceptedOffer.startPort = offer.startPort;
863 acceptedOffer.rangeSize = 0;
868 .
name = processor.name,
869 .id = processor.name,
870 .channelPrefix = channelPrefix,
871 .options = processor.options,
873 .algorithm = processor.algorithm,
874 .rank = processor.rank,
875 .nSlots = processor.nSlots,
876 .inputTimesliceId = edge.timeIndex,
877 .maxInputTimeslices = processor.maxInputTimeslices,
878 .resource = {acceptedOffer},
879 .labels = processor.labels,
880 .metadata = processor.metadata};
882 if (processor.maxInputTimeslices != 1) {
888 auto id =
DeviceId{edge.consumer, edge.timeIndex, devices.size()};
889 devices.emplace_back(std::move(device));
890 deviceIndex.push_back(
id);
891 std::sort(deviceIndex.begin(), deviceIndex.end());
892 return devices.size() - 1;
899 auto findMatchingOutgoingPortForEdge = [&logicalEdges, &connections](
size_t ei) {
900 auto const& edge = logicalEdges[ei];
903 auto it = std::lower_bound(connections.begin(), connections.end(), connectionId);
905 assert(it != connections.end());
906 assert(it->producer == connectionId.producer);
907 assert(it->consumer == connectionId.consumer);
908 assert(it->timeIndex == connectionId.timeIndex);
909 assert(it->producerTimeIndex == connectionId.producerTimeIndex);
913 auto checkNoDuplicatesFor = [](std::vector<InputChannelSpec>
const&
channels,
const std::string&
name) {
914 for (
auto const& channel :
channels) {
915 if (channel.name ==
name) {
921 auto appendInputChannelForConsumerDevice = [&devices, &checkNoDuplicatesFor, &channelPolicies](
922 size_t pi,
size_t ci,
unsigned short port) {
923 auto const& producerDevice = devices[pi];
924 auto& consumerDevice = devices[ci];
926 channel.
name = producerDevice.channelPrefix +
"from_" + producerDevice.id +
"_to_" + consumerDevice.id;
927 channel.
hostname = producerDevice.resource.hostname;
929 for (
auto& policy : channelPolicies) {
930 if (policy.match(producerDevice.id, consumerDevice.id)) {
931 policy.modifyInput(channel);
935 assert(checkNoDuplicatesFor(consumerDevice.inputChannels, channel.
name));
936 consumerDevice.inputChannels.push_back(channel);
937 return consumerDevice.inputChannels.size() - 1;
943 auto getChannelForEdge = [&devices](
size_t pi,
size_t ci) {
944 auto& consumerDevice = devices[ci];
945 return consumerDevice.inputChannels.size() - 1;
952 auto appendInputRouteToDestDeviceChannel = [&devices, &logicalEdges, &workflow](
size_t ei,
size_t di,
size_t ci) {
953 auto const& edge = logicalEdges[ei];
954 auto const& consumer = workflow[edge.consumer];
955 auto const& producer = workflow[edge.producer];
956 auto& consumerDevice = devices[
di];
958 auto const& inputSpec = consumer.inputs[edge.consumerInputIndex];
959 auto const& sourceChannel = consumerDevice.inputChannels[ci].name;
963 edge.consumerInputIndex,
965 edge.producerTimeIndex,
972 for (
size_t iri = 0; iri < consumerDevice.inputs.size(); ++iri) {
973 auto& existingRoute = consumerDevice.inputs[iri];
974 if (existingRoute.timeslice != edge.producerTimeIndex) {
977 if (existingRoute.inputSpecIndex == edge.consumerInputIndex) {
985 if (edge.isForward && route.matcher.lifetime == Lifetime::Timer) {
987 "Warning: Forwarding timer {} from {} to a {} as both requested it."
988 " If this is undesired, please make sure to use two different data matchers for their InputSpec.",
990 producer.name.c_str(),
991 consumer.name.c_str());
992 route.matcher.lifetime = Lifetime::Timeframe;
995 consumerDevice.inputs.push_back(route);
1003 for (
size_t edge : inEdgeIndex) {
1004 auto& action = actions[edge];
1006 size_t consumerDevice = -1;
1008 if (action.requiresNewDevice) {
1009 if (hasConsumerForEdge(edge)) {
1010 consumerDevice = getConsumerForEdge(edge);
1012 consumerDevice = createNewDeviceForEdge(edge, acceptedOffer);
1015 consumerDevice = findConsumerForEdge(edge);
1017 size_t producerDevice = findProducerForEdge(edge);
1019 size_t channel = -1;
1020 if (action.requiresNewChannel) {
1021 int16_t port = findMatchingOutgoingPortForEdge(edge);
1022 channel = appendInputChannelForConsumerDevice(producerDevice, consumerDevice, port);
1024 channel = getChannelForEdge(producerDevice, consumerDevice);
1026 appendInputRouteToDestDeviceChannel(edge, consumerDevice, channel);
1030 for (
auto& device : devices) {
1031 for (
auto& route : device.inputs) {
1032 switch (route.matcher.lifetime) {
1033 case Lifetime::OutOfBand:
1034 route.configurator = {
1047 route.configurator = {
1053 case Lifetime::Timer:
1054 route.configurator = {
1060 case Lifetime::Enumeration:
1061 route.configurator = {
1062 .name =
"enumeration",
1067 case Lifetime::Signal:
1068 route.configurator = {
1074 case Lifetime::Transient:
1075 route.configurator = {
1076 .name =
"transient",
1081 case Lifetime::Optional:
1082 route.configurator = {
1094 if (acceptedOffer.
hostname !=
"") {
1103 std::vector<ChannelConfigurationPolicy>
const& channelPolicies,
1104 std::vector<CompletionPolicy>
const& completionPolicies,
1105 std::vector<DispatchPolicy>
const& dispatchPolicies,
1106 std::vector<ResourcePolicy>
const& resourcePolicies,
1107 std::vector<CallbacksPolicy>
const& callbacksPolicies,
1108 std::vector<SendingPolicy>
const& sendingPolicies,
1109 std::vector<ForwardingPolicy>
const& forwardingPolicies,
1110 std::vector<DeviceSpec>& devices,
1112 std::string
const& uniqueWorkflowId,
1114 bool optimizeTopology,
1115 unsigned short resourcesMonitoringInterval,
1116 std::string
const& channelPrefix,
1122 if (workflow.empty()) {
1125 std::vector<LogicalForwardInfo> availableForwardsInfo;
1126 std::vector<DeviceConnectionEdge> logicalEdges;
1127 std::vector<DeviceConnectionId> connections;
1128 std::vector<DeviceId> deviceIndex;
1133 std::vector<OutputSpec> outputs;
1147 std::vector<size_t> inEdgeIndex;
1148 std::vector<size_t> outEdgeIndex;
1155 size_t deviceCount = 0;
1156 for (
auto& action : outActions) {
1157 deviceCount += action.requiresNewDevice ? 1 : 0;
1159 for (
auto& action : inActions) {
1160 deviceCount += action.requiresNewDevice ? 1 : 0;
1165 defaultOffer.
cpu += offer.cpu;
1166 defaultOffer.
memory += offer.memory;
1170 defaultOffer.
cpu /= deviceCount + 1;
1171 defaultOffer.
memory /= deviceCount + 1;
1173 processOutEdgeActions(configContext, devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1174 outActions, workflow, outputs, channelPolicies, sendingPolicies, forwardingPolicies, channelPrefix, defaultOffer, overrideServices);
1177 std::sort(connections.begin(), connections.end());
1179 processInEdgeActions(devices, deviceIndex, connections, resourceManager, inEdgeIndex, logicalEdges,
1180 inActions, workflow, availableForwardsInfo, channelPolicies, channelPrefix, defaultOffer, overrideServices);
1183 std::map<std::string, DataProcessorPoliciesInfo> policies;
1185 bool hasPolicy =
false;
1186 policies[device.name].completionPolicyName =
"unknown";
1187 for (
auto& policy : completionPolicies) {
1188 if (policy.matcher(device) ==
true) {
1189 policies[policy.name].completionPolicyName = policy.name;
1190 device.completionPolicy = policy;
1195 if (hasPolicy ==
false) {
1196 throw runtime_error_f(
"Unable to find a completion policy for %s", device.id.c_str());
1198 for (
auto& policy : dispatchPolicies) {
1199 if (policy.deviceMatcher(device) ==
true) {
1200 device.dispatchPolicy = policy;
1204 for (
auto& policy : callbacksPolicies) {
1205 if (policy.matcher(device, configContext) ==
true) {
1206 device.callbacksPolicy = policy;
1211 for (
auto& policy : resourcePolicies) {
1212 if (policy.matcher(device) ==
true) {
1213 device.resourcePolicy = policy;
1218 if (hasPolicy ==
false) {
1219 throw runtime_error_f(
"Unable to find a resource policy for %s", device.id.c_str());
1223 std::vector<DataProcessorPoliciesInfo> policiesVector;
1224 for (
size_t wi = 0; wi < workflow.size(); ++wi) {
1225 auto& processor = workflow[wi];
1226 auto& info = policies[processor.name];
1227 policiesVector.push_back(info);
1232 for (
auto& device : devices) {
1233 device.resourceMonitoringInterval = resourcesMonitoringInterval;
1236 auto findDeviceIndex = [&deviceIndex](
size_t processorIndex,
size_t timeslice) {
1237 for (
auto& deviceEdge : deviceIndex) {
1238 if (deviceEdge.processorIndex != processorIndex) {
1241 if (deviceEdge.timeslice != timeslice) {
1244 return deviceEdge.deviceIndex;
1251 if (optimizeTopology) {
1252 for (
auto& connection : connections) {
1253 auto& device1 = devices[findDeviceIndex(connection.consumer, connection.timeIndex)];
1254 auto& device2 = devices[findDeviceIndex(connection.producer, connection.producerTimeIndex)];
1256 if (device1.resource.hostname != device2.resource.hostname) {
1259 for (
auto& input : device1.inputChannels) {
1260 for (
auto&
output : device2.outputChannels) {
1261 if (input.hostname ==
output.hostname && input.port ==
output.port) {
1264 input.hostname += uniqueWorkflowId;
1265 output.hostname += uniqueWorkflowId;
1275 std::string finalValue;
1276 for (
auto& info : infos) {
1277 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(),
name);
1278 if (it == info.cmdLineArgs.end()) {
1281 auto value = it + 1;
1282 if (
value == info.cmdLineArgs.end()) {
1285 if (!finalValue.empty() && finalValue != *
value) {
1288 finalValue = *
value;
1289 info.cmdLineArgs.erase(it, it + 2);
1291 if (finalValue.empty() && defaultValue ==
nullptr) {
1294 if (finalValue.empty()) {
1295 finalValue = defaultValue;
1297 for (
auto& info : infos) {
1298 info.cmdLineArgs.emplace_back(
name);
1299 info.cmdLineArgs.push_back(finalValue);
1303void DeviceSpecHelpers::reworkIntegerOption(std::vector<DataProcessorInfo>& infos,
char const*
name, std::function<
long long()> defaultValueCallback,
long long startValue, std::function<
long long(
long long,
long long)> bestValue)
1305 int64_t finalValue = startValue;
1306 bool wasModified =
false;
1307 for (
auto& info : infos) {
1308 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(),
name);
1309 if (it == info.cmdLineArgs.end()) {
1312 auto valueS = it + 1;
1313 if (valueS == info.cmdLineArgs.end()) {
1316 char* err =
nullptr;
1317 long long value = strtoll(valueS->c_str(), &err, 10);
1318 finalValue = bestValue(
value, finalValue);
1320 info.cmdLineArgs.erase(it, it + 2);
1322 if (!wasModified && defaultValueCallback ==
nullptr) {
1326 finalValue = defaultValueCallback();
1328 for (
auto& info : infos) {
1329 info.cmdLineArgs.emplace_back(
name);
1336 int64_t segmentSize = 0;
1337 for (
auto& info : infos) {
1338 auto it = std::find(info.cmdLineArgs.begin(), info.cmdLineArgs.end(),
"--shm-segment-size");
1339 if (it == info.cmdLineArgs.end()) {
1342 auto value = it + 1;
1343 if (
value == info.cmdLineArgs.end()) {
1344 throw runtime_error(
"--shm-segment-size requires an argument");
1346 char* err =
nullptr;
1347 int64_t
size = strtoll(
value->c_str(), &err, 10);
1348 if (
size > segmentSize) {
1351 info.cmdLineArgs.erase(it, it + 2);
1354 if (segmentSize == 0) {
1355 struct rlimit limits;
1356 getrlimit(RLIMIT_AS, &limits);
1357 if (limits.rlim_cur != RLIM_INFINITY) {
1358 segmentSize = std::min(limits.rlim_cur - 1000000000LL, (limits.rlim_cur * 90LL) / 100LL);
1361 if (segmentSize == 0) {
1362 segmentSize = 2000000000LL;
1364 for (
auto& info : infos) {
1365 info.cmdLineArgs.emplace_back(
"--shm-segment-size");
1372template <
class Container>
1373void split(
const std::string&
str, Container& cont)
1375 std::istringstream iss(
str);
1376 std::copy(std::istream_iterator<std::string>(iss),
1377 std::istream_iterator<std::string>(),
1378 std::back_inserter(cont));
1383 unsigned short driverPort,
1385 std::vector<DataProcessorInfo>
const& processorInfos,
1386 std::vector<DeviceSpec>
const& deviceSpecs,
1387 std::vector<DeviceExecution>& deviceExecutions,
1388 std::vector<DeviceControl>& deviceControls,
1389 std::vector<ConfigParamSpec>
const& detectedOptions,
1390 std::string
const& uniqueWorkflowId)
1392 assert(deviceSpecs.size() == deviceExecutions.size());
1393 assert(deviceControls.size() == deviceExecutions.size());
1394 for (
size_t si = 0; si < deviceSpecs.size(); ++si) {
1395 auto& spec = deviceSpecs[si];
1397 O2_SIGNPOST_START(device_spec_helpers, poid,
"prepareArguments",
"Preparing options for %{public}s", spec.id.c_str());
1398 auto& control = deviceControls[si];
1399 auto& execution = deviceExecutions[si];
1401 control.quiet = defaultQuiet;
1402 control.stopped = defaultStopped;
1410 std::vector<ConfigParamSpec> workflowOptions = detectedOptions;
1411 for (
auto& opt : detectedOptions) {
1412 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid,
"prepareArguments",
"Processor option %{public}s passed as previously detected", opt.name.c_str());
1417 auto pi = std::find_if(processorInfos.begin(), processorInfos.end(), [&](
auto const&
x) { return x.name == spec.id; });
1418 argc = pi->cmdLineArgs.size() + 1;
1419 argv = (
char**)malloc(
sizeof(
char**) * (argc + 1));
1420 argv[0] = strdup(pi->executable.data());
1421 for (
size_t ai = 0; ai < pi->cmdLineArgs.size(); ++ai) {
1422 auto const& arg = pi->cmdLineArgs[ai];
1423 argv[ai + 1] = strdup(arg.data());
1425 argv[argc] =
nullptr;
1426 for (
auto& opt : pi->workflowOptions) {
1427 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid,
"prepareArguments",
"Processor option %{public}s found in process description", opt.name.c_str());
1428 workflowOptions.push_back(opt);
1431 auto last = std::unique(workflowOptions.begin(), workflowOptions.end());
1432 workflowOptions.erase(last, workflowOptions.end());
1434 for (
auto& opt : workflowOptions) {
1435 O2_SIGNPOST_EVENT_EMIT(device_spec_helpers, poid,
"prepareArguments",
"Final unique option %{public}s added to list of workflowOptions", opt.name.c_str());
1453 std::vector<std::string> tmpArgs = {argv[0],
1454 "--id", spec.id.c_str(),
1455 "--control", interactive ?
"gui" :
"static",
1456 "--shm-monitor",
"false",
1457 "--log-color",
"false",
1458 driverConfig.
batch ?
"--batch" :
"--no-batch",
1459 "--color",
"false"};
1464 std::vector<std::string> deviceOptionsSequence;
1465 std::unordered_map<std::string, std::string> uniqueDeviceArgs;
1466 auto updateDeviceArguments = [&deviceOptionsSequence, &uniqueDeviceArgs](
auto key,
auto value) {
1467 if (uniqueDeviceArgs.find(
key) == uniqueDeviceArgs.end()) {
1469 deviceOptionsSequence.emplace_back(
key);
1473 std::vector<std::string> tmpEnv;
1474 if (defaultStopped) {
1475 tmpArgs.emplace_back(
"-s");
1482 const char*
name = spec.name.c_str();
1483 bpo::options_description od;
1484 bpo::options_description foDesc;
1486 od.add_options()(
name, bpo::value<std::string>());
1490 foDesc.add(forwardedOptions);
1493 bool haveSessionArg =
false;
1494 using FilterFunctionT = std::function<
void(
decltype(argc),
decltype(argv),
decltype(od))>;
1495 bool useDefaultWS =
true;
1500 FilterFunctionT filterArgsFct = [&](
int largc,
char** largv,
const bpo::options_description& odesc) {
1502 using namespace bpo::command_line_style;
1503 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
1505 bpo::command_line_parser parser{largc, largv};
1506 parser.options(odesc).allow_unregistered();
1507 parser.style(style);
1508 bpo::parsed_options parsed_options = parser.run();
1510 bpo::variables_map varmap;
1511 bpo::store(parsed_options, varmap);
1512 if (varmap.count(
"environment")) {
1513 auto environment = varmap[
"environment"].as<std::string>();
1514 split(environment, tmpEnv);
1518 if (varmap.count(
"stacktrace-on-signal") && varmap[
"stacktrace-on-signal"].as<std::string>() !=
"none" && varmap[
"stacktrace-on-signal"].as<std::string>() !=
"simple") {
1519 char const* preload = getenv(
"LD_PRELOAD");
1520 if (preload ==
nullptr || strcmp(preload,
"libSegFault.so") == 0) {
1521 tmpEnv.emplace_back(
"LD_PRELOAD=libSegFault.so");
1523 tmpEnv.push_back(fmt::format(
"LD_PRELOAD={}:libSegFault.so", preload));
1525 tmpEnv.push_back(fmt::format(
"SEGFAULT_SIGNALS={}", varmap[
"stacktrace-on-signal"].as<std::string>()));
1532 if (varmap.count(
name) > 0) {
1536 auto arguments =
"--unused " + varmap[
name].as<std::string>();
1537 wordexp_t expansions;
1538 wordexp(
arguments.c_str(), &expansions, 0);
1539 bpo::options_description realOdesc = odesc;
1540 realOdesc.add_options()(
"severity", bpo::value<std::string>());
1541 realOdesc.add_options()(
"child-driver", bpo::value<std::string>());
1542 realOdesc.add_options()(
"rate", bpo::value<std::string>());
1543 realOdesc.add_options()(
"exit-transition-timeout", bpo::value<std::string>());
1544 realOdesc.add_options()(
"data-processing-timeout", bpo::value<std::string>());
1545 realOdesc.add_options()(
"expected-region-callbacks", bpo::value<std::string>());
1546 realOdesc.add_options()(
"timeframes-rate-limit", bpo::value<std::string>());
1547 realOdesc.add_options()(
"environment", bpo::value<std::string>());
1548 realOdesc.add_options()(
"stacktrace-on-signal", bpo::value<std::string>());
1549 realOdesc.add_options()(
"post-fork-command", bpo::value<std::string>());
1550 realOdesc.add_options()(
"bad-alloc-max-attempts", bpo::value<std::string>());
1551 realOdesc.add_options()(
"bad-alloc-attempt-interval", bpo::value<std::string>());
1552 realOdesc.add_options()(
"io-threads", bpo::value<std::string>());
1553 realOdesc.add_options()(
"shm-segment-size", bpo::value<std::string>());
1554 realOdesc.add_options()(
"shm-mlock-segment", bpo::value<std::string>());
1555 realOdesc.add_options()(
"shm-mlock-segment-on-creation", bpo::value<std::string>());
1556 realOdesc.add_options()(
"shm-zero-segment", bpo::value<std::string>());
1557 realOdesc.add_options()(
"shm-throw-bad-alloc", bpo::value<std::string>());
1558 realOdesc.add_options()(
"shm-segment-id", bpo::value<std::string>());
1559 realOdesc.add_options()(
"shm-allocation", bpo::value<std::string>());
1560 realOdesc.add_options()(
"shm-no-cleanup", bpo::value<std::string>());
1561 realOdesc.add_options()(
"shmid", bpo::value<std::string>());
1562 realOdesc.add_options()(
"shm-metadata-msg-size", bpo::value<std::string>()->default_value(
"0"));
1563 realOdesc.add_options()(
"shm-monitor", bpo::value<std::string>());
1564 realOdesc.add_options()(
"channel-prefix", bpo::value<std::string>());
1565 realOdesc.add_options()(
"network-interface", bpo::value<std::string>());
1566 realOdesc.add_options()(
"early-forward-policy", bpo::value<std::string>());
1567 realOdesc.add_options()(
"session", bpo::value<std::string>());
1568 realOdesc.add_options()(
"signposts", bpo::value<std::string>());
1569 filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc);
1570 wordfree(&expansions);
1574 const char* child_driver_key =
"child-driver";
1575 if (varmap.count(child_driver_key) > 0) {
1576 auto arguments = varmap[child_driver_key].as<std::string>();
1577 wordexp_t expansions;
1578 wordexp(
arguments.c_str(), &expansions, 0);
1579 tmpArgs.insert(tmpArgs.begin(), expansions.we_wordv, expansions.we_wordv + expansions.we_wordc);
1582 haveSessionArg = haveSessionArg || varmap.count(
"session") != 0;
1583 useDefaultWS = useDefaultWS && ((varmap.count(
"driver-client-backend") == 0) || varmap[
"driver-client-backend"].as<std::string>() ==
"ws://");
1585 auto processRawChannelConfig = [&tmpArgs, &spec](
const std::string& conf) {
1588 while (std::getline(ss, token,
';')) {
1589 token.erase(token.begin(), std::find_if(token.begin(), token.end(), [](
int ch) { return !std::isspace(ch); }));
1590 token.erase(std::find_if(token.rbegin(), token.rend(), [](
int ch) { return !std::isspace(ch); }).base(), token.end());
1591 if (!token.empty()) {
1592 tmpArgs.emplace_back(
"--channel-config");
1593 tmpArgs.emplace_back(token);
1598 for (
const auto varit : varmap) {
1601 const auto* description = odesc.find_nothrow(varit.first,
false);
1602 if (description && varmap.count(varit.first)) {
1604 auto semantic = description->semantic();
1605 const char* optarg =
"";
1610 assert(semantic->min_tokens() <= 1);
1612 if (semantic->min_tokens() > 0) {
1613 std::string stringRep;
1614 if (
auto v = boost::any_cast<std::string>(&varit.second.value())) {
1616 }
else if (
auto v = boost::any_cast<EarlyForwardPolicy>(&varit.second.value())) {
1617 std::stringstream tmp;
1619 stringRep = fmt::format(
"{}", tmp.str());
1621 if (varit.first ==
"channel-config") {
1626 processRawChannelConfig(stringRep);
1627 optarg = tmpArgs.back().c_str();
1629 std::string
key(fmt::format(
"--{}", varit.first));
1630 if (stringRep.length() == 0) {
1636 updateDeviceArguments(
key, stringRep);
1637 optarg = uniqueDeviceArgs[
key].c_str();
1639 }
else if (semantic->min_tokens() == 0 && varit.second.as<
bool>()) {
1640 updateDeviceArguments(fmt::format(
"--{}", varit.first),
"");
1643 control.options.insert(std::make_pair(varit.first, optarg));
1649 filterArgsFct(argc, argv, foDesc);
1651 filterArgsFct(argc, argv, od);
1654 for (
auto& channel : spec.outputChannels) {
1655 tmpArgs.emplace_back(std::string(
"--channel-config"));
1658 for (
auto& channel : spec.inputChannels) {
1659 tmpArgs.emplace_back(std::string(
"--channel-config"));
1664 if (!haveSessionArg) {
1665 updateDeviceArguments(std::string(
"--session"),
"dpl_" + uniqueWorkflowId);
1670 updateDeviceArguments(std::string(
"--driver-client-backend"),
"ws://0.0.0.0:" +
std::to_string(driverPort));
1673 if (spec.resourceMonitoringInterval > 0) {
1674 updateDeviceArguments(std::string(
"--resources-monitoring"),
std::to_string(spec.resourceMonitoringInterval));
1679 for (
auto& arg : tmpArgs) {
1680 execution.args.emplace_back(strdup(arg.c_str()));
1682 for (
auto&
key : deviceOptionsSequence) {
1683 execution.args.emplace_back(strdup(
key.c_str()));
1684 std::string
const&
value = uniqueDeviceArgs[
key];
1685 if (
value.empty()) {
1688 }
else if (
value ==
" ") {
1691 execution.args.emplace_back(strdup(
""));
1693 execution.args.emplace_back(strdup(
value.c_str()));
1697 execution.args.push_back(
nullptr);
1699 for (
auto& env : tmpEnv) {
1700 execution.environ.emplace_back(strdup(env.c_str()));
1704 std::ostringstream
str;
1705 for (
size_t ai = 0; ai < execution.args.size() - 1; ai++) {
1706 if (execution.args[ai] ==
nullptr) {
1707 LOG(error) <<
"Bad argument for " << execution.args[ai - 1];
1709 assert(execution.args[ai]);
1710 str <<
" " << execution.args[ai];
1712 O2_SIGNPOST_END(device_spec_helpers, poid,
"prepareArguments",
"The following options are being forwarded to %{public}s: %{public}s",
1713 spec.id.c_str(),
str.str().c_str());
1722 bpo::options_description forwardedDeviceOptions;
1723 char const* defaultSignposts = getenv(
"DPL_SIGNPOSTS") ? getenv(
"DPL_SIGNPOSTS") :
"";
1724 forwardedDeviceOptions.add_options()
1725 (
"severity", bpo::value<std::string>()->default_value(
"info"),
"severity level of the log")
1726 (
"plugin,P", bpo::value<std::string>(),
"FairMQ plugin list")
1727 (
"plugin-search-path,S", bpo::value<std::string>(),
"FairMQ plugins search path")
1728 (
"control-port", bpo::value<std::string>(),
"Utility port to be used by O2 Control")
1729 (
"rate", bpo::value<std::string>(),
"rate for a data source device (Hz)")
1730 (
"exit-transition-timeout", bpo::value<std::string>(),
"timeout before switching to READY state")
1731 (
"data-processing-timeout", bpo::value<std::string>(),
"timeout after which only calibration can happen")
1732 (
"expected-region-callbacks", bpo::value<std::string>(),
"region callbacks to expect before starting")
1733 (
"timeframes-rate-limit", bpo::value<std::string>()->default_value(
"0"),
"how many timeframes can be in fly")
1734 (
"shm-monitor", bpo::value<std::string>(),
"whether to use the shared memory monitor")
1735 (
"channel-prefix", bpo::value<std::string>()->default_value(
""),
"prefix to use for multiplexing multiple workflows in the same session")
1736 (
"bad-alloc-max-attempts", bpo::value<std::string>()->default_value(
"1"),
"throw after n attempts to alloc shm")
1737 (
"bad-alloc-attempt-interval", bpo::value<std::string>()->default_value(
"50"),
"interval between shm alloc attempts in ms")
1738 (
"io-threads", bpo::value<std::string>()->default_value(
"1"),
"number of FMQ io threads")
1739 (
"shm-segment-size", bpo::value<std::string>(),
"size of the shared memory segment in bytes")
1740 (
"shm-mlock-segment", bpo::value<std::string>()->default_value(
"false"),
"mlock shared memory segment")
1741 (
"shm-mlock-segment-on-creation", bpo::value<std::string>()->default_value(
"false"),
"mlock shared memory segment once on creation")
1742 (
"shm-zero-segment", bpo::value<std::string>()->default_value(
"false"),
"zero shared memory segment")
1743 (
"shm-throw-bad-alloc", bpo::value<std::string>()->default_value(
"true"),
"throw if insufficient shm memory")
1744 (
"shm-segment-id", bpo::value<std::string>()->default_value(
"0"),
"shm segment id")
1745 (
"shm-allocation", bpo::value<std::string>()->default_value(
"rbtree_best_fit"),
"shm allocation method")
1746 (
"shm-no-cleanup", bpo::value<std::string>()->default_value(
"false"),
"no shm cleanup")
1747 (
"shmid", bpo::value<std::string>(),
"shmid")
1748 (
"shm-metadata-msg-size", bpo::value<std::string>()->default_value(
"0"),
"numeric value in B used for padding FairMQ header, see FairMQ v.1.6.0")
1749 (
"environment", bpo::value<std::string>(),
"comma separated list of environment variables to set for the device")
1750 (
"stacktrace-on-signal", bpo::value<std::string>()->default_value(
"simple"),
1751 "dump stacktrace on specified signal(s) (any of `all`, `segv`, `bus`, `ill`, `abrt`, `fpe`, `sys`.)"
1752 "Use `simple` to dump only the main thread in a reliable way")
1753 (
"post-fork-command", bpo::value<std::string>(),
"post fork command to execute (e.g. numactl {pid}")
1754 (
"session", bpo::value<std::string>(),
"unique label for the shared memory session")
1755 (
"network-interface", bpo::value<std::string>(),
"network interface to which to bind tpc fmq ports without specified address")
1756 (
"early-forward-policy", bpo::value<EarlyForwardPolicy>()->default_value(
EarlyForwardPolicy::NEVER),
"when to forward early the messages: never, noraw, always")
1757 (
"configuration,cfg", bpo::value<std::string>(),
"configuration connection string")
1758 (
"driver-client-backend", bpo::value<std::string>(),
"driver connection string")
1759 (
"monitoring-backend", bpo::value<std::string>(),
"monitoring connection string")
1760 (
"dpl-stats-min-online-publishing-interval", bpo::value<std::string>(),
"minimum flushing interval for online metrics (in s)")
1761 (
"infologger-mode", bpo::value<std::string>(),
"O2_INFOLOGGER_MODE override")
1762 (
"infologger-severity", bpo::value<std::string>(),
"minimun FairLogger severity which goes to info logger")
1763 (
"dpl-tracing-flags", bpo::value<std::string>(),
"pipe separated list of events to trace")
1764 (
"signposts", bpo::value<std::string>()->default_value(defaultSignposts),
1765 "comma separated list of signposts to enable (any of `completion`, `data_processor_context`, `stream_context`, `device`, `monitoring_service`)")
1766 (
"child-driver", bpo::value<std::string>(),
"external driver to start childs with (e.g. valgrind)");
1768 return forwardedDeviceOptions;
1774 return std::find_if(spec.
labels.begin(), spec.
labels.end(), sameLabel) != spec.
labels.end();
1781 std::regex re(
"\\{timeslice([0-9]+)\\}");
1784 while (std::regex_search(
fmt,
match, re)) {
1785 auto timeslice = std::stoi(
match[1]);
1787 fmt =
match.prefix().str() + replacement +
match.suffix().str();
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_poll_s uv_poll_t
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
virtual void notifyAcceptedOffer(ComputingOffer const &)=0
virtual std::vector< ComputingOffer > getAvailableOffers()=0
bool match(const std::vector< std::string > &queries, const char *pattern)
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
GLsizei GLsizei GLchar * source
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean GLboolean GLboolean GLboolean a
bpo::variables_map arguments
auto timer_fired(uv_timer_t *timer)
void timer_callback(uv_timer_t *handle)
void signal_callback(uv_signal_t *handle, int)
auto timer_set_period(uv_timer_t *timer)
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
std::vector< OverrideServiceSpec > OverrideServiceSpecs
std::vector< DataProcessorSpec > WorkflowSpec
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
std::vector< std::string > split(const std::string &str, char delimiter=',')
static char const * typeAsString(enum ChannelType type)
return a ChannelType as a lowercase string
static std::string channelUrl(InputChannelSpec const &)
static char const * methodAsString(enum ChannelMethod method)
return a ChannelMethod as a lowercase string
A computing resource which can be offered to run a device.
header::DataOrigin origin
header::DataDescription description
static bool dpl2BoostOptions(const std::vector< ConfigParamSpec > &spec, options_description &options, boost::program_options::options_description vetos=options_description())
A label that can be associated to a DataProcessorSpec.
static std::optional< ConcreteDataMatcher > asOptionalConcreteDataMatcher(OutputSpec const &spec)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void processOutEdgeActions(ConfigContext const &configContext, std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< OutputSpec > &outputs, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ForwardingPolicy > const &forwardingPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void processInEdgeActions(std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, const std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &inEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< LogicalForwardInfo > &availableForwardsInfo, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void validate(WorkflowSpec const &workflow)
static boost::program_options::options_description getForwardedDeviceOptions()
define the options which are forwarded to every child
static std::string inputChannel2String(const InputChannelSpec &channel)
Helper to provide the channel configuration string for an input channel.
static std::string reworkTimeslicePlaceholder(std::string const &str, DeviceSpec const &spec)
static void prepareArguments(bool defaultQuiet, bool defaultStopped, bool intereactive, unsigned short driverPort, DriverConfig const &driverConfig, std::vector< DataProcessorInfo > const &processorInfos, std::vector< DeviceSpec > const &deviceSpecs, std::vector< DeviceExecution > &deviceExecutions, std::vector< DeviceControl > &deviceControls, std::vector< ConfigParamSpec > const &detectedOptions, std::string const &uniqueWorkflowId)
static void reworkShmSegmentSize(std::vector< DataProcessorInfo > &infos)
static void reworkHomogeneousOption(std::vector< DataProcessorInfo > &infos, char const *name, char const *defaultValue)
static void reworkIntegerOption(std::vector< DataProcessorInfo > &infos, char const *name, std::function< long long()> defaultValueCallback, long long startValue, std::function< long long(long long, long long)> bestValue)
static bool hasLabel(DeviceSpec const &spec, char const *label)
static std::string outputChannel2String(const OutputChannelSpec &channel)
Helper to provide the channel configuration string for an output channel.
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
std::vector< DataProcessorLabel > labels
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
bool batch
Whether the driver was started in batch mode or not.
static RouteConfigurator::DanglingConfigurator danglingTimerConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingConditionConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringConditionConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::ExpirationConfigurator expiringOutOfBandConfigurator(InputSpec const &spec)
static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const &matcher, size_t inputTimeslice, size_t maxInputTimeslices)
static RouteConfigurator::ExpirationConfigurator expiringTimeframeConfigurator()
static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const &matcher, size_t inputTimeslice, size_t maxInputTimeslices)
static RouteConfigurator::ExpirationConfigurator expiringOptionalConfigurator(InputSpec const &spec, std::string const &sourceChannel)
When the record expires, simply create a dummy entry.
static RouteConfigurator::DanglingConfigurator danglingEnumerationConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingQAConfigurator()
static RouteConfigurator::CreationConfigurator createOptionalConfigurator()
This behaves as data. I.e. we never create it unless data arrives.
static RouteConfigurator::ExpirationConfigurator expiringOOBConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::CreationConfigurator loopEventDrivenConfigurator(InputSpec const &matcher)
static RouteConfigurator::ExpirationConfigurator expiringTimerConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::DanglingConfigurator danglingTimeframeConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringTransientConfigurator(InputSpec const &)
static RouteConfigurator::CreationConfigurator oobDrivenConfigurator()
static RouteConfigurator::CreationConfigurator timeDrivenConfigurator(InputSpec const &matcher)
static RouteConfigurator::DanglingConfigurator danglingOptionalConfigurator(std::vector< InputRoute > const &routes)
This will always exipire an optional record when no data is received.
static RouteConfigurator::DanglingConfigurator danglingOutOfBandConfigurator()
static RouteConfigurator::CreationConfigurator dataDrivenConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringQAConfigurator()
static RouteConfigurator::ExpirationConfigurator expiringEnumerationConfigurator(InputSpec const &spec, std::string const &sourceChannel)
static RouteConfigurator::DanglingConfigurator danglingTransientConfigurator()
static RouteConfigurator::CreationConfigurator fairmqDrivenConfiguration(InputSpec const &spec, int inputTimeslice, int maxInputTimeslices)
static ExpirationHandler::Handler fetchFromCCDBCache(InputSpec const &spec, std::string const &prefix, std::string const &overrideTimestamp, std::string const &sourceChannel)
static ExpirationHandler::Handler dummy(ConcreteDataMatcher const &spec, std::string const &sourceChannel)
Create a dummy message with the provided ConcreteDataMatcher.
static ExpirationHandler::Handler fetchFromQARegistry()
static ExpirationHandler::Creator enumDrivenCreation(size_t first, size_t last, size_t step, size_t inputTimeslice, size_t maxTimeSliceId, size_t repetitions)
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const &spec, std::string const &sourceChannel, int64_t orbitOffset, int64_t orbitMultiplier)
Enumerate entries on every invokation.
static ExpirationHandler::Creator timeDrivenCreation(std::vector< std::chrono::microseconds > periods, std::vector< std::chrono::seconds > intervals, std::function< bool(void)> hasTimerFired, std::function< void(uint64_t, uint64_t)> updateTimerPeriod)
static ExpirationHandler::Creator dataDrivenCreation()
Callback which does nothing, waiting for data to arrive.
static ExpirationHandler::Checker expireAlways()
static ExpirationHandler::Handler fetchFromObjectRegistry()
static ExpirationHandler::Checker expectCTP(std::string const &serverUrl, bool waitForCTP)
static ExpirationHandler::Checker expireNever()
static ExpirationHandler::Creator uvDrivenCreation(int loopReason, DeviceState &state)
Callback which creates a new timeslice whenever some libuv event happens.
static ExpirationHandler::Handler fetchFromFairMQ(InputSpec const &spec, std::string const &channelName)
static ExpirationHandler::Handler doNothing()
static ExpirationHandler::Checker expireIfPresent(std::vector< InputRoute > const &schema, ConcreteDataMatcher matcher)
enum ChannelMethod method
std::function< ExpirationHandler::Creator(DeviceState &, ServiceRegistryRef, ConfigParamRegistry const &)> CreationConfigurator
std::function< ExpirationHandler::Handler(DeviceState &, ConfigParamRegistry const &)> ExpirationConfigurator
std::function< ExpirationHandler::Checker(DeviceState &, ConfigParamRegistry const &)> DanglingConfigurator
static ServiceSpecs filterDisabled(ServiceSpecs originals, OverrideServiceSpecs const &overrides)
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels