1028 std::string
const& defaultDriverClient,
1031 fair::Logger::SetConsoleColor(
false);
1032 fair::Logger::OnFatal([]() {
throw runtime_error(
"Fatal error"); });
1034 LOG(info) <<
"Spawning new device " << spec.id <<
" in process with pid " << getpid();
1036 fair::mq::DeviceRunner runner{argc, argv};
1040 runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner&
r) {
1041 std::string defaultExitTransitionTimeout =
"0";
1042 std::string defaultDataProcessingTimeout =
"0";
1043 std::string defaultInfologgerMode =
"";
1046 defaultExitTransitionTimeout =
"40";
1047 defaultDataProcessingTimeout =
"20";
1048 defaultInfologgerMode =
"infoLoggerD";
1050 defaultExitTransitionTimeout =
"40";
1051 defaultDataProcessingTimeout =
"20";
1053 boost::program_options::options_description optsDesc;
1055 char const* defaultSignposts = getenv(
"DPL_SIGNPOSTS");
1056 optsDesc.add_options()(
"monitoring-backend", bpo::value<std::string>()->default_value(
"default"),
"monitoring backend info")
1057 (
"dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value(
"0"),
"minimum flushing interval for online metrics (in s)")
1058 (
"driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient),
"backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets")
1059 (
"infologger-severity", bpo::value<std::string>()->default_value(
""),
"minimum FairLogger severity to send to InfoLogger")
1060 (
"dpl-tracing-flags", bpo::value<std::string>()->default_value(
""),
"pipe `|` separate list of events to be traced")
1061 (
"signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts :
""),
"comma separated list of signposts to enable")
1062 (
"expected-region-callbacks", bpo::value<std::string>()->default_value(
"0"),
"how many region callbacks we are expecting")
1063 (
"exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout),
"how many second to wait before switching from RUN to READY")
1064 (
"error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens()->default_value(
false),
"print error instead of warning when exit transition timer expires")
1065 (
"data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout),
"how many second to wait before stopping data processing and allowing data calibration")
1066 (
"timeframes-rate-limit", bpo::value<std::string>()->default_value(
"0"),
"how many timeframe can be in flight at the same moment (0 disables)")
1067 (
"configuration,cfg", bpo::value<std::string>()->default_value(
"command-line"),
"configuration backend")
1068 (
"infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode),
"O2_INFOLOGGER_MODE override")
1069 (
"log-timestamp-us", bpo::value<bool>()->zero_tokens()->default_value(
false),
"enable microsecond timestamps in log messages");
1070 r.fConfig.AddToCmdLineOptions(optsDesc,
true);
1075 std::unique_ptr<SimpleRawDeviceService> simpleRawDeviceService;
1076 std::unique_ptr<DeviceState> deviceState;
1077 std::unique_ptr<ComputingQuotaEvaluator> quotaEvaluator;
1078 std::unique_ptr<FairMQDeviceProxy> deviceProxy;
1079 std::unique_ptr<DeviceContext> deviceContext;
1081 auto afterConfigParsingCallback = [&simpleRawDeviceService,
1087 &danglingEdgesContext,
1090 &processingPolicies,
1093 &loop](fair::mq::DeviceRunner&
r) {
1095 simpleRawDeviceService = std::make_unique<SimpleRawDeviceService>(
nullptr, spec);
1096 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<RawDeviceService>(simpleRawDeviceService.get()));
1098 deviceState = std::make_unique<DeviceState>();
1099 deviceState->loop = loop;
1101 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<DeviceState>(deviceState.get()));
1103 quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
1104 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
1107 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
1108 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
1109 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
1110 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
1111 serviceRef.
registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(&danglingEdgesContext));
1113 auto device = std::make_unique<DataProcessingDevice>(
ref, serviceRegistry);
1116 r.fDevice = std::move(device);
1117 fair::Logger::SetConsoleColor(
false);
1118 if (
r.fConfig.GetProperty<
bool>(
"log-timestamp-us")) {
1119 fair::Logger::DefineVerbosity(fair::Verbosity::user1,
1120 fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us,
1121 fair::VerbositySpec::Info::severity));
1122 fair::Logger::SetVerbosity(fair::Verbosity::user1);
1126 for (
auto& service : spec.services) {
1127 LOG(
debug) <<
"Declaring service " << service.name;
1128 serviceRegistry.
declareService(service, *deviceState.get(),
r.fConfig);
1131 serviceRef.
get<
Monitoring>().enableProcessMonitoring(spec.resourceMonitoringInterval, {PmMeasurement::Cpu, PmMeasurement::Mem, PmMeasurement::Smaps});
1135 runner.AddHook<fair::mq::hooks::InstantiateDevice>(afterConfigParsingCallback);
1137 auto result = runner.Run();
1380 DriverInfo& driverInfo,
1382 std::vector<DeviceMetricsInfo>& metricsInfos,
1383 std::vector<ConfigParamSpec>
const& detectedParams,
1384 boost::program_options::variables_map& varmap,
1385 std::vector<ServiceSpec>& driverServices,
1386 std::string frameworkId)
1390 .shmSegmentId = (int16_t)atoi(varmap[
"shm-segment-id"].as<std::string>().c_str())};
1394 auto* devicesManager =
new DevicesManager{.
controls = controls, .infos = infos, .specs = runningWorkflow.devices, .messages = {}};
1398 std::vector<uv_poll_t*> pollHandles;
1399 std::vector<DeviceStdioContext> childFds;
1401 std::vector<ComputingResource> resources;
1403 if (driverInfo.resources !=
"") {
1409 auto resourceManager = std::make_unique<SimpleResourceManager>(resources);
1412 void* window =
nullptr;
1413 decltype(debugGUI->
getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl)) debugGUICallback;
1416 auto initDebugGUI = []() ->
DebugGUI* {
1417 uv_lib_t supportLib;
1420 result = uv_dlopen(
"libO2FrameworkGUISupport.dylib", &supportLib);
1422 result = uv_dlopen(
"libO2FrameworkGUISupport.so", &supportLib);
1425 LOG(error) << uv_dlerror(&supportLib);
1430 result = uv_dlsym(&supportLib,
"dpl_plugin_callback", (
void**)&dpl_plugin_callback);
1432 LOG(error) << uv_dlerror(&supportLib);
1436 return PluginManager::getByName<DebugGUI>(pluginInstance,
"ImGUIDebugGUI");
1444 if ((driverConfig.
batch ==
false || getenv(
"DPL_DRIVER_REMOTE_GUI") !=
nullptr) && frameworkId.empty()) {
1445 debugGUI = initDebugGUI();
1447 if (driverConfig.
batch ==
false) {
1448 window = debugGUI->
initGUI(
"O2 Framework debug GUI", serviceRegistry);
1450 window = debugGUI->
initGUI(
nullptr, serviceRegistry);
1453 }
else if (getenv(
"DPL_DEVICE_REMOTE_GUI") && !frameworkId.empty()) {
1454 debugGUI = initDebugGUI();
1462 window = debugGUI->
initGUI(
nullptr, serviceRegistry);
1465 if (driverConfig.
batch ==
false && window ==
nullptr && frameworkId.empty()) {
1466 LOG(warn) <<
"Could not create GUI. Switching to batch mode. Do you have GLFW on your system?";
1467 driverConfig.
batch =
true;
1468 if (varmap[
"error-policy"].defaulted()) {
1469 driverInfo.processingPolicies.error = TerminationPolicy::QUIT;
1472 bool guiQuitRequested =
false;
1473 bool hasError =
false;
1477 DriverState current;
1478 DriverState previous;
1484 if (!driverConfig.
batch) {
1486 uv_timer_init(loop, gui_timer);
1489 std::vector<ServiceMetricHandling> metricProcessingCallbacks;
1490 std::vector<ServiceSummaryHandling> summaryCallbacks;
1491 std::vector<ServicePreSchedule> preScheduleCallbacks;
1492 std::vector<ServicePostSchedule> postScheduleCallbacks;
1493 std::vector<ServiceDriverInit> driverInitCallbacks;
1494 for (
auto& service : driverServices) {
1495 if (service.driverStartup ==
nullptr) {
1498 service.driverStartup(serviceRegistry,
DeviceConfig{varmap});
1502 ref.registerService(ServiceRegistryHelpers::handleForService<DevicesManager>(devicesManager));
1504 bool guiTimerExpired =
false;
1506 guiContext.
plugin = debugGUI;
1509 guiContext.
frameCost = &driverInfo.frameCost;
1518 .controls = &controls,
1520 .states = &allStates,
1521 .specs = &runningWorkflow.devices,
1522 .metrics = &metricsInfos,
1523 .metricProcessingCallbacks = &metricProcessingCallbacks,
1524 .summaryCallbacks = &summaryCallbacks,
1525 .driver = &driverInfo,
1527 .isDriver = frameworkId.empty(),
1530 serverContext.serverHandle.data = &serverContext;
1533 uv_timer_init(loop, &force_step_timer);
1535 uv_timer_init(loop, &force_exit_timer);
1537 bool guiDeployedOnce =
false;
1541 metricDumpTimer.data = &serverContext;
1542 bool allChildrenGone =
false;
1549 serverContext.asyncLogProcessing->data = &serverContext;
1550 uv_async_init(loop, serverContext.asyncLogProcessing, [](
uv_async_t* handle) {
1551 auto* context = (DriverServerContext*)handle->data;
1552 processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls);
1553 for (auto* statusHandler : context->statusHandlers) {
1554 for (size_t di = 0; di < context->infos->size(); ++di) {
1555 statusHandler->sendNewLogs(di);
1562 if (driverControl.forcedTransitions.empty() ==
false) {
1563 for (
auto transition : driverControl.forcedTransitions) {
1564 driverInfo.states.push_back(transition);
1566 driverControl.forcedTransitions.resize(0);
1571 auto currentTime = uv_hrtime();
1572 uint64_t diff = (currentTime - driverInfo.startTime) / 1000000000LL;
1573 if ((
graceful_exit ==
false) && (driverInfo.timeout > 0) && (diff > driverInfo.timeout)) {
1574 LOG(info) <<
"Timout ellapsed. Requesting to quit.";
1579 if (
graceful_exit ==
true && driverInfo.sigintRequested ==
false) {
1580 driverInfo.sigintRequested =
true;
1581 driverInfo.states.resize(0);
1582 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1587 driverInfo.sigchldRequested =
true;
1588 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
1590 if (driverInfo.states.empty() ==
false) {
1592 current = driverInfo.states.back();
1594 current = DriverState::UNKNOWN;
1596 driverInfo.states.pop_back();
1598 case DriverState::BIND_GUI_PORT:
1599 bindGUIPort(driverInfo, serverContext, frameworkId);
1601 case DriverState::INIT:
1602 LOGP(info,
"Initialising O2 Data Processing Layer. Driver PID: {}.", getpid());
1603 LOGP(info,
"Driver listening on port: {}", driverInfo.port);
1606 driverInfo.sa_handle_child.sa_handler = &handle_sigchld;
1607 sigemptyset(&driverInfo.sa_handle_child.sa_mask);
1608 driverInfo.sa_handle_child.sa_flags = SA_RESTART | SA_NOCLDSTOP;
1609 if (sigaction(SIGCHLD, &driverInfo.sa_handle_child,
nullptr) == -1) {
1616 if (driverInfo.noSHMCleanup) {
1617 LOGP(warning,
"Not cleaning up shared memory.");
1625 for (
auto& callback : driverInitCallbacks) {
1626 callback(serviceRegistry, {varmap});
1628 driverInfo.states.push_back(DriverState::RUNNING);
1630 LOG(info) <<
"O2 Data Processing Layer initialised. We brake for nobody.";
1632 LOGF(info,
"Optimised build. O2DEBUG / LOG(debug) / LOGF(debug) / assert statement will not be shown.");
1635 case DriverState::IMPORT_CURRENT_WORKFLOW:
1638 dataProcessorInfos = previousDataProcessorInfos;
1639 for (
auto const& device : runningWorkflow.devices) {
1640 auto exists = std::find_if(dataProcessorInfos.begin(),
1641 dataProcessorInfos.end(),
1642 [
id = device.id](
DataProcessorInfo const& info) ->
bool { return info.name == id; });
1643 if (exists != dataProcessorInfos.end()) {
1647 for (
auto channel : device.inputChannels) {
1650 for (
auto channel : device.outputChannels) {
1653 dataProcessorInfos.push_back(
1658 workflowInfo.options,
1662 case DriverState::MATERIALISE_WORKFLOW:
1665 if (driverConfig.batch ==
true && varmap[
"dds"].as<std::string>().empty() && !varmap[
"dump-workflow"].as<
bool>() && workflowState == WorkflowParsingState::Empty) {
1666 LOGP(error,
"Empty workflow provided while running in batch mode.");
1672 auto altered_workflow = workflow;
1674 auto confNameFromParam = [](std::string
const& paramName) {
1675 std::regex name_regex(R
"(^control:([\w-]+)\/(\w+))");
1676 auto match = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 0);
1677 if (
match == std::sregex_token_iterator()) {
1678 throw runtime_error_f(
"Malformed process control spec: %s", paramName.c_str());
1680 std::string task = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 1)->str();
1681 std::string conf = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 2)->str();
1682 return std::pair{task, conf};
1684 bool altered =
false;
1685 for (
auto& device : altered_workflow) {
1687 if (device.name.find(
"internal") != std::string::npos) {
1691 if (device.inputs.empty() ==
true) {
1695 auto hasMetadata = std::ranges::any_of(device.inputs, [](
InputSpec const& spec) {
1696 return spec.metadata.empty() == false;
1702 auto hasControls = std::ranges::any_of(device.inputs, [](
InputSpec const& spec) {
1703 return std::ranges::any_of(spec.metadata, [](ConfigParamSpec const& param) {
1704 return param.type == VariantType::Bool && param.name.find(
"control:") != std::string::npos;
1711 LOGP(
debug,
"Adjusting device {}", device.name.c_str());
1714 if (configStore !=
nullptr) {
1715 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1716 for (
auto& input : device.inputs) {
1717 for (
auto&
param : input.metadata) {
1718 if (
param.type == VariantType::Bool &&
param.name.find(
"control:") != std::string::npos) {
1719 if (
param.name !=
"control:default" &&
param.name !=
"control:spawn" &&
param.name !=
"control:build" &&
param.name !=
"control:define") {
1720 auto confName = confNameFromParam(
param.name).second;
1721 param.defaultValue = reg->get<
bool>(confName.c_str());
1728 LOGP(
debug,
"Original inputs: ");
1729 for (
auto& input : device.inputs) {
1730 LOGP(
debug,
"-> {}", input.binding);
1732 auto end = device.inputs.end();
1733 auto new_end = std::remove_if(device.inputs.begin(), device.inputs.end(), [](
InputSpec& input) {
1734 auto requested = false;
1735 auto hasControls = false;
1736 for (auto& param : input.metadata) {
1737 if (param.type != VariantType::Bool) {
1740 if (param.name.find(
"control:") != std::string::npos) {
1742 if (param.defaultValue.get<bool>() == true) {
1753 device.inputs.erase(new_end,
end);
1754 LOGP(
debug,
"Adjusted inputs: ");
1755 for (
auto& input : device.inputs) {
1763 for (
auto& service : driverServices) {
1764 if (service.adjustTopology ==
nullptr) {
1767 service.adjustTopology(
node, *driverInfo.configContext);
1775 driverInfo.channelPolicies,
1776 driverInfo.completionPolicies,
1777 driverInfo.dispatchPolicies,
1778 driverInfo.resourcePolicies,
1779 driverInfo.callbacksPolicies,
1780 driverInfo.sendingPolicies,
1781 driverInfo.forwardingPolicies,
1782 runningWorkflow.devices,
1784 driverInfo.uniqueWorkflowId,
1785 *driverInfo.configContext,
1786 !varmap[
"no-IPC"].as<
bool>(),
1787 driverInfo.resourcesMonitoringInterval,
1788 varmap[
"channel-prefix"].as<std::string>(),
1790 metricProcessingCallbacks.clear();
1791 std::vector<std::string> matchingServices;
1794 matchingServices.clear();
1795 for (
auto& device : runningWorkflow.devices) {
1796 for (
auto& service : device.services) {
1798 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1801 if (service.metricHandling) {
1802 metricProcessingCallbacks.push_back(service.metricHandling);
1803 matchingServices.push_back(service.name);
1809 matchingServices.clear();
1810 for (
auto& device : runningWorkflow.devices) {
1811 for (
auto& service : device.services) {
1813 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1816 if (service.summaryHandling) {
1817 summaryCallbacks.push_back(service.summaryHandling);
1818 matchingServices.push_back(service.name);
1823 preScheduleCallbacks.clear();
1824 matchingServices.clear();
1825 for (
auto& device : runningWorkflow.devices) {
1826 for (
auto& service : device.services) {
1828 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1831 if (service.preSchedule) {
1832 preScheduleCallbacks.push_back(service.preSchedule);
1836 postScheduleCallbacks.clear();
1837 matchingServices.clear();
1838 for (
auto& device : runningWorkflow.devices) {
1839 for (
auto& service : device.services) {
1841 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1844 if (service.postSchedule) {
1845 postScheduleCallbacks.push_back(service.postSchedule);
1849 driverInitCallbacks.clear();
1850 matchingServices.clear();
1851 for (
auto& device : runningWorkflow.devices) {
1852 for (
auto& service : device.services) {
1854 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1857 if (service.driverInit) {
1858 driverInitCallbacks.push_back(service.driverInit);
1866 for (
auto& device : runningWorkflow.devices) {
1868 if (device.name.find(
"internal") != std::string::npos) {
1872 if (configStore !=
nullptr) {
1873 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1874 for (
auto& option : device.options) {
1875 const char*
name = option.name.c_str();
1876 switch (option.type) {
1877 case VariantType::Int:
1878 option.defaultValue = reg->get<int32_t>(
name);
1880 case VariantType::Int8:
1881 option.defaultValue = reg->get<int8_t>(
name);
1883 case VariantType::Int16:
1884 option.defaultValue = reg->get<int16_t>(
name);
1886 case VariantType::UInt8:
1889 case VariantType::UInt16:
1890 option.defaultValue = reg->get<uint16_t>(
name);
1892 case VariantType::UInt32:
1893 option.defaultValue = reg->get<uint32_t>(
name);
1895 case VariantType::UInt64:
1896 option.defaultValue = reg->get<uint64_t>(
name);
1898 case VariantType::Int64:
1901 case VariantType::Float:
1902 option.defaultValue = reg->get<
float>(
name);
1904 case VariantType::Double:
1905 option.defaultValue = reg->get<
double>(
name);
1907 case VariantType::String:
1908 option.defaultValue = reg->get<std::string>(
name);
1910 case VariantType::Bool:
1911 option.defaultValue = reg->get<
bool>(
name);
1913 case VariantType::ArrayInt:
1914 option.defaultValue = reg->get<std::vector<int>>(
name);
1916 case VariantType::ArrayFloat:
1917 option.defaultValue = reg->get<std::vector<float>>(
name);
1919 case VariantType::ArrayDouble:
1920 option.defaultValue = reg->get<std::vector<double>>(
name);
1922 case VariantType::ArrayString:
1923 option.defaultValue = reg->get<std::vector<std::string>>(
name);
1925 case VariantType::Array2DInt:
1928 case VariantType::Array2DFloat:
1931 case VariantType::Array2DDouble:
1934 case VariantType::LabeledArrayInt:
1937 case VariantType::LabeledArrayFloat:
1940 case VariantType::LabeledArrayDouble:
1943 case VariantType::LabeledArrayString:
1952 }
catch (std::runtime_error& e) {
1953 LOGP(error,
"invalid workflow in {}: {}", driverInfo.argv[0], e.what());
1957#ifdef DPL_ENABLE_BACKTRACE
1960 LOGP(error,
"invalid workflow in {}: {}", driverInfo.argv[0], err.what);
1963 LOGP(error,
"invalid workflow in {}: Unknown error while materialising workflow", driverInfo.argv[0]);
1967 case DriverState::DO_CHILD:
1969 if (driverControl.defaultStopped) {
1970 kill(getpid(), SIGSTOP);
1972 for (
size_t di = 0;
di < runningWorkflow.devices.size();
di++) {
1974 if (runningWorkflow.devices[
di].id == frameworkId) {
1975 return doChild(driverInfo.argc, driverInfo.argv,
1978 runningWorkflow,
ref,
1980 driverInfo.processingPolicies,
1981 driverInfo.defaultDriverClient,
1986 std::ostringstream ss;
1987 for (
auto& processor : workflow) {
1988 ss <<
" - " << processor.name <<
"\n";
1990 for (
auto& spec : runningWorkflow.devices) {
1991 ss <<
" - " << spec.name <<
"(" << spec.id <<
")"
1994 driverInfo.lastError = fmt::format(
1995 "Unable to find component with id {}."
1996 " Available options:\n{}",
1997 frameworkId, ss.str());
1998 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2001 case DriverState::REDEPLOY_GUI:
2011 if (!driverConfig.batch || getenv(
"DPL_DRIVER_REMOTE_GUI")) {
2013 uv_timer_stop(gui_timer);
2016 auto callback = debugGUI->getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl);
2017 guiContext.callback = [&serviceRegistry, &driverServices, &debugGUI, &infos, &runningWorkflow, &dataProcessorInfos, &metricsInfos, &driverInfo, &controls, &driverControl, callback]() {
2019 for (
auto& service : driverServices) {
2020 if (service.postRenderGUI) {
2021 service.postRenderGUI(serviceRegistry);
2025 guiContext.window = window;
2028 gui_timer->data = &guiContext;
2031 guiDeployedOnce =
true;
2034 case DriverState::MERGE_CONFIGS: {
2036 controls.resize(runningWorkflow.devices.size());
2039 if (varmap.count(
"dpl-tracing-flags")) {
2040 for (
auto& control : controls) {
2042 control.tracingFlags = tracingFlags;
2045 deviceExecutions.resize(runningWorkflow.devices.size());
2049 const auto uniformOptions = {
2051 "--aod-memory-rate-limit",
2052 "--aod-writer-json",
2053 "--aod-writer-ntfmerge",
2054 "--aod-writer-resdir",
2055 "--aod-writer-resfile",
2056 "--aod-writer-resmode",
2057 "--aod-writer-maxfilesize",
2058 "--aod-writer-keep",
2059 "--aod-max-io-rate",
2060 "--aod-parent-access-level",
2061 "--aod-parent-base-path-replacement",
2062 "--aod-origin-level-mapping",
2063 "--driver-client-backend",
2064 "--fairmq-ipc-prefix",
2067 "--resources-monitoring",
2068 "--resources-monitoring-file",
2069 "--resources-monitoring-dump-interval",
2073 for (
auto& option : uniformOptions) {
2079 driverControl.defaultStopped,
2080 driverInfo.processingPolicies.termination == TerminationPolicy::WAIT,
2084 runningWorkflow.devices,
2088 driverInfo.uniqueWorkflowId);
2091 LOGP(error,
"unable to merge configurations in {}: {}", driverInfo.argv[0], err.what);
2092#ifdef DPL_ENABLE_BACKTRACE
2093 std::cerr <<
"\nStacktrace follows:\n\n";
2099 case DriverState::SCHEDULE: {
2104 LOG(info) <<
"Redeployment of configuration asked.";
2105 std::ostringstream forwardedStdin;
2107 infos.reserve(runningWorkflow.devices.size());
2110 unsigned parentCPU = -1;
2111 unsigned parentNode = -1;
2112#if defined(__linux__) && __has_include(<sched.h>)
2113 parentCPU = sched_getcpu();
2114#elif __has_include(<linux/getcpu.h>)
2115 getcpu(&parentCPU, &parentNode,
nullptr);
2116#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
2121 for (
auto& callback : preScheduleCallbacks) {
2122 callback(serviceRegistry, {varmap});
2124 childFds.resize(runningWorkflow.devices.size());
2125 for (
int di = 0;
di < (
int)runningWorkflow.devices.size(); ++
di) {
2126 auto& context = childFds[
di];
2129 if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[
di].resource.hostname != driverInfo.deployHostname) {
2131 runningWorkflow.devices[
di], controls[
di], deviceExecutions[
di], infos, allStates);
2136 runningWorkflow.devices, driverInfo,
2137 controls, deviceExecutions, infos,
2139 serviceRegistry, varmap,
2140 childFds, parentCPU, parentNode);
2145 for (
auto& callback : postScheduleCallbacks) {
2146 callback(serviceRegistry, {varmap});
2148 assert(infos.empty() ==
false);
2153 uv_timer_init(loop, &metricDumpTimer);
2155 driverInfo.resourcesMonitoringDumpInterval * 1000,
2156 driverInfo.resourcesMonitoringDumpInterval * 1000);
2159 for (
const auto& processorInfo : dataProcessorInfos) {
2160 const auto& cmdLineArgs = processorInfo.cmdLineArgs;
2161 if (std::find(cmdLineArgs.begin(), cmdLineArgs.end(),
"--severity") != cmdLineArgs.end()) {
2162 for (
size_t counter = 0;
const auto& spec : runningWorkflow.devices) {
2163 if (spec.name.compare(processorInfo.name) == 0) {
2165 const auto logLevelIt = std::find(cmdLineArgs.begin(), cmdLineArgs.end(),
"--severity") + 1;
2166 if ((*logLevelIt).compare(
"debug") == 0) {
2167 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2168 }
else if ((*logLevelIt).compare(
"detail") == 0) {
2169 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2170 }
else if ((*logLevelIt).compare(
"info") == 0) {
2171 info.logLevel = LogParsingHelpers::LogLevel::Info;
2172 }
else if ((*logLevelIt).compare(
"warning") == 0) {
2173 info.logLevel = LogParsingHelpers::LogLevel::Warning;
2174 }
else if ((*logLevelIt).compare(
"error") == 0) {
2175 info.logLevel = LogParsingHelpers::LogLevel::Error;
2176 }
else if ((*logLevelIt).compare(
"important") == 0) {
2177 info.logLevel = LogParsingHelpers::LogLevel::Info;
2178 }
else if ((*logLevelIt).compare(
"alarm") == 0) {
2179 info.logLevel = LogParsingHelpers::LogLevel::Alarm;
2180 }
else if ((*logLevelIt).compare(
"critical") == 0) {
2181 info.logLevel = LogParsingHelpers::LogLevel::Critical;
2182 }
else if ((*logLevelIt).compare(
"fatal") == 0) {
2183 info.logLevel = LogParsingHelpers::LogLevel::Fatal;
2191 LOG(info) <<
"Redeployment of configuration done.";
2193 case DriverState::RUNNING:
2197 devicesManager->flush();
2201 if (guiTimerExpired ==
false) {
2202 O2_SIGNPOST_EVENT_EMIT(driver, sid,
"mainloop",
"Entering event loop with %{public}s", once ?
"UV_RUN_ONCE" :
"UV_RUN_NOWAIT");
2204 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2208 if (guiQuitRequested ||
2209 (driverInfo.processingPolicies.termination == TerminationPolicy::QUIT && (
checkIfCanExit(infos) ==
true))) {
2214 LOG(info) <<
"Quitting";
2215 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2216 }
else if (infos.size() != runningWorkflow.devices.size()) {
2221 driverInfo.states.push_back(DriverState::RUNNING);
2222 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2223 driverInfo.states.push_back(DriverState::SCHEDULE);
2224 driverInfo.states.push_back(DriverState::MERGE_CONFIGS);
2225 }
else if (runningWorkflow.devices.empty() && driverConfig.batch ==
true) {
2226 LOG(info) <<
"No device resulting from the workflow. Quitting.";
2228 driverInfo.states.push_back(DriverState::EXIT);
2229 }
else if (runningWorkflow.devices.empty() && driverConfig.batch ==
false && !guiDeployedOnce) {
2231 driverInfo.states.push_back(DriverState::RUNNING);
2232 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2234 driverInfo.states.push_back(DriverState::RUNNING);
2237 case DriverState::QUIT_REQUESTED: {
2238 std::time_t
result = std::time(
nullptr);
2242 guiQuitRequested =
true;
2249 force_step_timer.data = &infos;
2251 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2254 case DriverState::HANDLE_CHILDREN: {
2258 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2263 static bool forcefulExitMessage =
true;
2264 if (forcefulExitMessage) {
2265 LOG(info) <<
"Forceful exit requested.";
2266 forcefulExitMessage =
false;
2272 driverInfo.sigchldRequested =
false;
2277 bool supposedToQuit = (guiQuitRequested || canExit ||
graceful_exit);
2279 if (allChildrenGone && (supposedToQuit || driverInfo.processingPolicies.termination == TerminationPolicy::QUIT)) {
2281 driverInfo.states.resize(0);
2282 driverInfo.states.push_back(DriverState::EXIT);
2283 }
else if (hasError && driverInfo.processingPolicies.error == TerminationPolicy::QUIT && !supposedToQuit) {
2285 force_exit_timer.data = &infos;
2286 static bool forceful_timer_started =
false;
2287 if (forceful_timer_started ==
false) {
2288 forceful_timer_started =
true;
2291 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2292 }
else if (allChildrenGone ==
false && supposedToQuit) {
2293 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2297 case DriverState::EXIT: {
2299 if (driverInfo.resourcesMonitoringDumpInterval) {
2300 uv_timer_stop(&metricDumpTimer);
2302 LOGP(info,
"Dumping performance metrics to {}.json file", driverInfo.resourcesMonitoringFilename);
2305 dumpRunSummary(serverContext, driverInfo, infos, runningWorkflow.devices);
2312 if (infos.empty()) {
2315 boost::property_tree::ptree finalConfig;
2316 assert(infos.size() == runningWorkflow.devices.size());
2317 for (
size_t di = 0;
di < infos.size(); ++
di) {
2319 auto& spec = runningWorkflow.devices[
di];
2320 finalConfig.put_child(spec.name,
info.currentConfig);
2322 LOG(info) <<
"Dumping used configuration in dpl-config.json";
2324 std::ofstream outDPLConfigFile(
"dpl-config.json", std::ios::out);
2325 if (outDPLConfigFile.is_open()) {
2326 boost::property_tree::write_json(outDPLConfigFile, finalConfig);
2328 LOGP(warning,
"Could not write out final configuration file. Read only run folder?");
2330 if (driverInfo.noSHMCleanup) {
2331 LOGP(warning,
"Not cleaning up shared memory.");
2335 return calculateExitCode(driverInfo, runningWorkflow.devices, infos);
2337 case DriverState::PERFORM_CALLBACKS:
2338 for (
auto& callback : driverControl.callbacks) {
2339 callback(workflow, runningWorkflow.devices, deviceExecutions, dataProcessorInfos, commandInfo);
2341 driverControl.callbacks.clear();
2344 LOG(error) <<
"Driver transitioned in an unknown state("
2345 <<
"current: " << (
int)current
2346 <<
", previous: " << (
int)previous
2347 <<
"). Shutting down.";
2348 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2886 std::vector<ChannelConfigurationPolicy>
const& channelPolicies,
2887 std::vector<CompletionPolicy>
const& completionPolicies,
2888 std::vector<DispatchPolicy>
const& dispatchPolicies,
2889 std::vector<ResourcePolicy>
const& resourcePolicies,
2890 std::vector<CallbacksPolicy>
const& callbacksPolicies,
2891 std::vector<SendingPolicy>
const& sendingPolicies,
2892 std::vector<ConfigParamSpec>
const& currentWorkflowOptions,
2893 std::vector<ConfigParamSpec>
const& detectedParams,
2898 if (getenv(
"DPL_DRIVER_SIGNPOSTS")) {
2902 std::vector<std::string> currentArgs;
2903 std::vector<PluginInfo> plugins;
2906 for (
int ai = 1; ai < argc; ++ai) {
2907 currentArgs.emplace_back(argv[ai]);
2913 currentWorkflowOptions};
2917 bpo::options_description executorOptions(
"Executor options");
2918 const char* helpDescription =
"print help: short, full, executor, or processor name";
2920 executorOptions.add_options()
2921 (
"help,h", bpo::value<std::string>()->implicit_value(
"short"), helpDescription)
2922 (
"quiet,q", bpo::value<bool>()->zero_tokens()->default_value(
false),
"quiet operation")
2923 (
"stop,s", bpo::value<bool>()->zero_tokens()->default_value(
false),
"stop before device start")
2924 (
"single-step", bpo::value<bool>()->zero_tokens()->default_value(
false),
"start in single step mode")
2925 (
"batch,b", bpo::value<std::vector<std::string>>()->zero_tokens()->composing(),
"batch processing mode")
2926 (
"no-batch", bpo::value<bool>()->zero_tokens(),
"force gui processing mode")
2927 (
"no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(
false),
"do not cleanup the shm segment")
2928 (
"hostname", bpo::value<std::string>()->default_value(
"localhost"),
"hostname to deploy")
2929 (
"resources", bpo::value<std::string>()->default_value(
""),
"resources allocated for the workflow")
2930 (
"start-port,p", bpo::value<unsigned short>()->default_value(22000),
"start port to allocate")
2931 (
"port-range,pr", bpo::value<unsigned short>()->default_value(1000),
"ports in range")
2932 (
"completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.
termination)->default_value(TerminationPolicy::QUIT),
2933 "what to do when processing is finished: quit, wait")
2934 (
"error-policy", bpo::value<TerminationPolicy>(&processingPolicies.
error)->default_value(TerminationPolicy::QUIT),
2935 "what to do when a device has an error: quit, wait")
2936 (
"min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal),
2937 "minimum message level which will be considered as fatal and exit with 1")
2938 (
"graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(
false),
"produce graphviz output")
2939 (
"mermaid", bpo::value<std::string>()->default_value(
""),
"produce graph output in mermaid format in file under specified name or on stdout if argument is \"-\"")
2940 (
"timeout,t", bpo::value<uint64_t>()->default_value(0),
"forced exit timeout (in seconds)")
2941 (
"dds,D", bpo::value<std::string>()->default_value(
""),
"create DDS configuration")
2942 (
"dds-workflow-suffix,D", bpo::value<std::string>()->default_value(
""),
"suffix for DDS names")
2943 (
"dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(
false),
"dump workflow as JSON")
2944 (
"dump-workflow-file", bpo::value<std::string>()->default_value(
"-"),
"file to which do the dump")
2945 (
"driver-mode", bpo::value<DriverMode>(&driverMode)->default_value(DriverMode::STANDALONE), R
"(how to run the driver. default: "standalone". Valid: "embedded")")
2946 (
"run", bpo::value<bool>()->zero_tokens()->default_value(
false),
"run workflow merged so far. It implies --batch. Use --no-batch to see the GUI")
2947 (
"no-IPC", bpo::value<bool>()->zero_tokens()->default_value(
false),
"disable IPC topology optimization")
2948 (
"o2-control,o2", bpo::value<std::string>()->default_value(
""),
"dump O2 Control workflow configuration under the specified name")
2949 (
"resources-monitoring", bpo::value<unsigned short>()->default_value(0),
"enable cpu/memory monitoring for provided interval in seconds")
2950 (
"resources-monitoring-file", bpo::value<std::string>()->default_value(
"performanceMetrics.json"),
"file where to dump the metrics")
2951 (
"resources-monitoring-dump-interval", bpo::value<unsigned short>()->default_value(0),
"dump monitoring information to disk every provided seconds");
2956 (
"id,i", bpo::value<std::string>(),
"device id for child spawning")
2957 (
"channel-config", bpo::value<std::vector<std::string>>(),
"channel configuration")
2958 (
"control",
"control plugin")
2959 (
"log-color",
"logging color scheme")(
"color",
"logging color scheme");
2961 bpo::options_description visibleOptions;
2962 visibleOptions.add(executorOptions);
2964 auto physicalWorkflow = workflow;
2965 std::map<std::string, size_t> rankIndex;
2972 size_t workflowHashA = 0;
2973 std::hash<std::string> hash_fn;
2975 for (
auto& dp : workflow) {
2976 workflowHashA += hash_fn(dp.name);
2979 for (
auto& dp : workflow) {
2980 rankIndex.insert(std::make_pair(dp.name, workflowHashA));
2983 std::vector<DataProcessorInfo> dataProcessorInfos;
2987 std::vector<DataProcessorSpec> importedWorkflow;
2989 if (previousWorked ==
false) {
2993 size_t workflowHashB = 0;
2994 for (
auto& dp : importedWorkflow) {
2995 workflowHashB += hash_fn(dp.name);
3002 for (
auto& dp : importedWorkflow) {
3003 auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(),
3005 if (found == physicalWorkflow.end()) {
3006 physicalWorkflow.push_back(dp);
3007 rankIndex.insert(std::make_pair(dp.name, workflowHashB));
3016 for (
auto& dp : physicalWorkflow) {
3018 if (std::find_if(dp.labels.begin(), dp.labels.end(), isExpendable) != dp.labels.end()) {
3019 for (
auto&
output : dp.outputs) {
3020 if (
output.lifetime == Lifetime::Timeframe) {
3021 output.lifetime = Lifetime::Sporadic;
3033 if (!(dec.requestedAODs.empty() && dec.requestedDYNs.empty() && dec.requestedIDXs.empty() && dec.requestedTIMs.empty())) {
3036 for (
auto& service : driverServices) {
3037 if (service.injectTopology ==
nullptr) {
3041 service.injectTopology(
node, configContext);
3043 for (
auto& dp : physicalWorkflow) {
3044 if (dp.name.rfind(
"internal-", 0) == 0) {
3045 rankIndex.insert(std::make_pair(dp.name, hash_fn(
"internal")));
3052 return a.name < b.name;
3055 for (
auto& dp : physicalWorkflow) {
3056 std::stable_sort(dp.inputs.begin(), dp.inputs.end(),
3057 [](
InputSpec const&
a,
InputSpec const&
b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3058 std::stable_sort(dp.outputs.begin(), dp.outputs.end(),
3059 [](
OutputSpec const&
a,
OutputSpec const&
b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3064 std::vector<std::pair<int, int>> edges;
3066 if (physicalWorkflow.size() > 1) {
3070 if (topoInfos.size() != physicalWorkflow.size()) {
3073 throw std::runtime_error(
"Unable to do topological sort of the resulting workflow. Do you have loops?\n" +
debugTopoInfo(physicalWorkflow, topoInfos, edges));
3077 auto aRank = std::make_tuple(a.layer, -workflow.at(a.index).outputs.size(), workflow.at(a.index).name);
3078 auto bRank = std::make_tuple(b.layer, -workflow.at(b.index).outputs.size(), workflow.at(b.index).name);
3079 return aRank < bRank;
3082 std::vector<int> dataProcessorOrder;
3083 dataProcessorOrder.resize(topoInfos.size());
3084 for (
size_t i = 0;
i < topoInfos.size(); ++
i) {
3085 dataProcessorOrder[topoInfos[
i].index] =
i;
3087 std::vector<int> newLocations;
3088 newLocations.resize(dataProcessorOrder.size());
3089 for (
size_t i = 0;
i < dataProcessorOrder.size(); ++
i) {
3090 newLocations[dataProcessorOrder[
i]] =
i;
3100 bpo::options_description od;
3101 od.add(visibleOptions);
3106 using namespace bpo::command_line_style;
3107 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
3108 bpo::variables_map varmap;
3111 bpo::command_line_parser(argc, argv)
3116 }
catch (std::exception
const& e) {
3117 LOGP(error,
"error parsing options of {}: {}", argv[0], e.what());
3133 if (varmap.count(
"help")) {
3134 printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions);
3140 if (varmap.count(
"severity")) {
3141 auto logLevel = varmap[
"severity"].as<std::string>();
3142 if (logLevel ==
"debug") {
3143 fair::Logger::SetConsoleSeverity(fair::Severity::debug);
3144 }
else if (logLevel ==
"detail") {
3145 fair::Logger::SetConsoleSeverity(fair::Severity::detail);
3146 }
else if (logLevel ==
"info") {
3147 fair::Logger::SetConsoleSeverity(fair::Severity::info);
3148 }
else if (logLevel ==
"warning") {
3149 fair::Logger::SetConsoleSeverity(fair::Severity::warning);
3150 }
else if (logLevel ==
"error") {
3151 fair::Logger::SetConsoleSeverity(fair::Severity::error);
3152 }
else if (logLevel ==
"important") {
3153 fair::Logger::SetConsoleSeverity(fair::Severity::important);
3154 }
else if (logLevel ==
"alarm") {
3155 fair::Logger::SetConsoleSeverity(fair::Severity::alarm);
3156 }
else if (logLevel ==
"critical") {
3157 fair::Logger::SetConsoleSeverity(fair::Severity::critical);
3158 }
else if (logLevel ==
"fatal") {
3159 fair::Logger::SetConsoleSeverity(fair::Severity::fatal);
3161 LOGP(error,
"Invalid log level '{}'", logLevel);
3166 if (varmap[
"log-timestamp-us"].as<bool>()) {
3167 fair::Logger::DefineVerbosity(fair::Verbosity::user1,
3168 fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us,
3169 fair::VerbositySpec::Info::severity));
3170 fair::Logger::SetVerbosity(fair::Verbosity::user1);
3175 auto evaluateBatchOption = [&varmap]() ->
bool {
3176 if (varmap.count(
"no-batch") > 0) {
3179 if (varmap.count(
"batch") == 0) {
3181 return isatty(fileno(stdout)) == 0;
3190 DriverInfo driverInfo{
3191 .sendingPolicies = sendingPolicies,
3192 .forwardingPolicies = forwardingPolicies,
3193 .callbacksPolicies = callbacksPolicies};
3194 driverInfo.states.reserve(10);
3195 driverInfo.sigintRequested =
false;
3196 driverInfo.sigchldRequested =
false;
3197 driverInfo.channelPolicies = channelPolicies;
3198 driverInfo.completionPolicies = completionPolicies;
3199 driverInfo.dispatchPolicies = dispatchPolicies;
3200 driverInfo.resourcePolicies = resourcePolicies;
3201 driverInfo.argc = argc;
3202 driverInfo.argv = argv;
3203 driverInfo.noSHMCleanup = varmap[
"no-cleanup"].as<
bool>();
3204 driverInfo.processingPolicies.termination = varmap[
"completion-policy"].as<
TerminationPolicy>();
3205 driverInfo.processingPolicies.earlyForward = varmap[
"early-forward-policy"].as<
EarlyForwardPolicy>();
3206 driverInfo.mode = varmap[
"driver-mode"].as<
DriverMode>();
3208 auto batch = evaluateBatchOption();
3211 .driverHasGUI = (batch ==
false) || getenv(
"DPL_DRIVER_REMOTE_GUI") !=
nullptr,
3214 if (varmap[
"error-policy"].defaulted() && driverConfig.batch ==
false) {
3215 driverInfo.processingPolicies.error = TerminationPolicy::WAIT;
3217 driverInfo.processingPolicies.error = varmap[
"error-policy"].as<
TerminationPolicy>();
3220 driverInfo.startTime = uv_hrtime();
3221 driverInfo.startTimeMsFromEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
3222 std::chrono::system_clock::now().time_since_epoch())
3224 driverInfo.timeout = varmap[
"timeout"].as<uint64_t>();
3225 driverInfo.deployHostname = varmap[
"hostname"].as<std::string>();
3226 driverInfo.resources = varmap[
"resources"].as<std::string>();
3227 driverInfo.resourcesMonitoringInterval = varmap[
"resources-monitoring"].as<
unsigned short>();
3228 driverInfo.resourcesMonitoringFilename = varmap[
"resources-monitoring-file"].as<std::string>();
3229 driverInfo.resourcesMonitoringDumpInterval = varmap[
"resources-monitoring-dump-interval"].as<
unsigned short>();
3232 driverInfo.processorInfo = dataProcessorInfos;
3233 driverInfo.configContext = &configContext;
3240 std::string frameworkId;
3243 if (varmap.count(
"id")) {
3247 frameworkId = std::regex_replace(varmap[
"id"].as<std::string>(), std::regex{
"_dds.*"},
"");
3248 driverInfo.uniqueWorkflowId = fmt::format(
"{}", getppid());
3249 driverInfo.defaultDriverClient =
"stdout://";
3251 driverInfo.uniqueWorkflowId = fmt::format(
"{}", getpid());
3252 driverInfo.defaultDriverClient =
"ws://";