1354 DriverInfo& driverInfo,
1356 std::vector<DeviceMetricsInfo>& metricsInfos,
1357 std::vector<ConfigParamSpec>
const& detectedParams,
1358 boost::program_options::variables_map& varmap,
1359 std::vector<ServiceSpec>& driverServices,
1360 std::string frameworkId)
1364 .shmSegmentId = (int16_t)atoi(varmap[
"shm-segment-id"].as<std::string>().c_str())};
1368 auto* devicesManager =
new DevicesManager{.
controls = controls, .infos = infos, .specs = runningWorkflow.devices, .messages = {}};
1372 std::vector<uv_poll_t*> pollHandles;
1373 std::vector<DeviceStdioContext> childFds;
1375 std::vector<ComputingResource> resources;
1377 if (driverInfo.resources !=
"") {
1383 auto resourceManager = std::make_unique<SimpleResourceManager>(resources);
1386 void* window =
nullptr;
1387 decltype(debugGUI->
getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl)) debugGUICallback;
1390 auto initDebugGUI = []() ->
DebugGUI* {
1391 uv_lib_t supportLib;
1394 result = uv_dlopen(
"libO2FrameworkGUISupport.dylib", &supportLib);
1396 result = uv_dlopen(
"libO2FrameworkGUISupport.so", &supportLib);
1399 LOG(error) << uv_dlerror(&supportLib);
1404 result = uv_dlsym(&supportLib,
"dpl_plugin_callback", (
void**)&dpl_plugin_callback);
1406 LOG(error) << uv_dlerror(&supportLib);
1410 return PluginManager::getByName<DebugGUI>(pluginInstance,
"ImGUIDebugGUI");
1417 if ((driverConfig.
batch ==
false || getenv(
"DPL_DRIVER_REMOTE_GUI") !=
nullptr) && frameworkId.empty()) {
1418 debugGUI = initDebugGUI();
1420 if (driverConfig.
batch ==
false) {
1421 window = debugGUI->
initGUI(
"O2 Framework debug GUI", serviceRegistry);
1423 window = debugGUI->
initGUI(
nullptr, serviceRegistry);
1426 }
else if (getenv(
"DPL_DEVICE_REMOTE_GUI") && !frameworkId.empty()) {
1427 debugGUI = initDebugGUI();
1435 window = debugGUI->
initGUI(
nullptr, serviceRegistry);
1438 if (driverConfig.
batch ==
false && window ==
nullptr && frameworkId.empty()) {
1439 LOG(warn) <<
"Could not create GUI. Switching to batch mode. Do you have GLFW on your system?";
1440 driverConfig.
batch =
true;
1441 if (varmap[
"error-policy"].defaulted()) {
1442 driverInfo.processingPolicies.error = TerminationPolicy::QUIT;
1445 bool guiQuitRequested =
false;
1446 bool hasError =
false;
1450 DriverState current;
1451 DriverState previous;
1457 if (!driverConfig.
batch) {
1459 uv_timer_init(loop, gui_timer);
1462 std::vector<ServiceMetricHandling> metricProcessingCallbacks;
1463 std::vector<ServiceSummaryHandling> summaryCallbacks;
1464 std::vector<ServicePreSchedule> preScheduleCallbacks;
1465 std::vector<ServicePostSchedule> postScheduleCallbacks;
1466 std::vector<ServiceDriverInit> driverInitCallbacks;
1467 for (
auto& service : driverServices) {
1468 if (service.driverStartup ==
nullptr) {
1471 service.driverStartup(serviceRegistry,
DeviceConfig{varmap});
1475 ref.registerService(ServiceRegistryHelpers::handleForService<DevicesManager>(devicesManager));
1477 bool guiTimerExpired =
false;
1479 guiContext.
plugin = debugGUI;
1482 guiContext.
frameCost = &driverInfo.frameCost;
1491 .controls = &controls,
1493 .states = &allStates,
1494 .specs = &runningWorkflow.devices,
1495 .metrics = &metricsInfos,
1496 .metricProcessingCallbacks = &metricProcessingCallbacks,
1497 .summaryCallbacks = &summaryCallbacks,
1498 .driver = &driverInfo,
1500 .isDriver = frameworkId.empty(),
1503 serverContext.serverHandle.data = &serverContext;
1506 uv_timer_init(loop, &force_step_timer);
1508 uv_timer_init(loop, &force_exit_timer);
1510 bool guiDeployedOnce =
false;
1514 metricDumpTimer.data = &serverContext;
1515 bool allChildrenGone =
false;
1522 serverContext.asyncLogProcessing->data = &serverContext;
1523 uv_async_init(loop, serverContext.asyncLogProcessing, [](
uv_async_t* handle) {
1524 auto* context = (DriverServerContext*)handle->data;
1525 processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls);
1532 driverInfo.states.push_back(transition);
1539 auto currentTime = uv_hrtime();
1540 uint64_t diff = (currentTime - driverInfo.startTime) / 1000000000LL;
1541 if ((
graceful_exit ==
false) && (driverInfo.timeout > 0) && (diff > driverInfo.timeout)) {
1542 LOG(info) <<
"Timout ellapsed. Requesting to quit.";
1547 if (
graceful_exit ==
true && driverInfo.sigintRequested ==
false) {
1548 driverInfo.sigintRequested =
true;
1549 driverInfo.states.resize(0);
1550 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1555 driverInfo.sigchldRequested =
true;
1556 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
1558 if (driverInfo.states.empty() ==
false) {
1560 current = driverInfo.states.back();
1562 current = DriverState::UNKNOWN;
1564 driverInfo.states.pop_back();
1566 case DriverState::BIND_GUI_PORT:
1567 bindGUIPort(driverInfo, serverContext, frameworkId);
1569 case DriverState::INIT:
1570 LOGP(info,
"Initialising O2 Data Processing Layer. Driver PID: {}.", getpid());
1571 LOGP(info,
"Driver listening on port: {}", driverInfo.port);
1574 driverInfo.sa_handle_child.sa_handler = &handle_sigchld;
1575 sigemptyset(&driverInfo.sa_handle_child.sa_mask);
1576 driverInfo.sa_handle_child.sa_flags = SA_RESTART | SA_NOCLDSTOP;
1577 if (sigaction(SIGCHLD, &driverInfo.sa_handle_child,
nullptr) == -1) {
1584 if (driverInfo.noSHMCleanup) {
1585 LOGP(warning,
"Not cleaning up shared memory.");
1593 for (
auto& callback : driverInitCallbacks) {
1594 callback(serviceRegistry, {varmap});
1596 driverInfo.states.push_back(DriverState::RUNNING);
1598 LOG(info) <<
"O2 Data Processing Layer initialised. We brake for nobody.";
1600 LOGF(info,
"Optimised build. O2DEBUG / LOG(debug) / LOGF(debug) / assert statement will not be shown.");
1603 case DriverState::IMPORT_CURRENT_WORKFLOW:
1606 dataProcessorInfos = previousDataProcessorInfos;
1607 for (
auto const& device : runningWorkflow.devices) {
1608 auto exists = std::find_if(dataProcessorInfos.begin(),
1609 dataProcessorInfos.end(),
1610 [
id = device.id](
DataProcessorInfo const& info) ->
bool { return info.name == id; });
1611 if (exists != dataProcessorInfos.end()) {
1615 for (
auto channel : device.inputChannels) {
1618 for (
auto channel : device.outputChannels) {
1621 dataProcessorInfos.push_back(
1630 case DriverState::MATERIALISE_WORKFLOW:
1633 if (driverConfig.
batch ==
true && varmap[
"dds"].as<std::string>().empty() && !varmap[
"dump-workflow"].as<
bool>() && workflowState == WorkflowParsingState::Empty) {
1634 LOGP(error,
"Empty workflow provided while running in batch mode.");
1640 auto altered_workflow = workflow;
1642 auto confNameFromParam = [](std::string
const& paramName) {
1643 std::regex name_regex(R
"(^control:([\w-]+)\/(\w+))");
1644 auto match = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 0);
1645 if (
match == std::sregex_token_iterator()) {
1646 throw runtime_error_f(
"Malformed process control spec: %s", paramName.c_str());
1648 std::string task = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 1)->str();
1649 std::string conf = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 2)->str();
1650 return std::pair{task, conf};
1652 bool altered =
false;
1653 for (
auto& device : altered_workflow) {
1655 if (device.name.find(
"internal") != std::string::npos) {
1659 if (device.inputs.empty() ==
true) {
1663 auto hasMetadata = std::any_of(device.inputs.begin(), device.inputs.end(), [](
InputSpec const& spec) {
1664 return spec.metadata.empty() == false;
1670 auto hasControls = std::any_of(device.inputs.begin(), device.inputs.end(), [](
InputSpec const& spec) {
1671 return std::any_of(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& param) {
1672 return param.type == VariantType::Bool && param.name.find(
"control:") != std::string::npos;
1679 LOGP(
debug,
"Adjusting device {}", device.name.c_str());
1682 if (configStore !=
nullptr) {
1683 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1684 for (
auto& input : device.inputs) {
1685 for (
auto&
param : input.metadata) {
1686 if (
param.type == VariantType::Bool &&
param.name.find(
"control:") != std::string::npos) {
1687 if (
param.name !=
"control:default" &&
param.name !=
"control:spawn" &&
param.name !=
"control:build") {
1688 auto confName = confNameFromParam(
param.name).second;
1689 param.defaultValue = reg->get<
bool>(confName.c_str());
1696 LOGP(
debug,
"Original inputs: ");
1697 for (
auto& input : device.inputs) {
1698 LOGP(
debug,
"-> {}", input.binding);
1700 auto end = device.inputs.end();
1701 auto new_end = std::remove_if(device.inputs.begin(), device.inputs.end(), [](
InputSpec& input) {
1702 auto requested = false;
1703 auto hasControls = false;
1704 for (auto& param : input.metadata) {
1705 if (param.type != VariantType::Bool) {
1708 if (param.name.find(
"control:") != std::string::npos) {
1710 if (param.defaultValue.get<bool>() == true) {
1721 device.inputs.erase(new_end,
end);
1722 LOGP(
debug,
"Adjusted inputs: ");
1723 for (
auto& input : device.inputs) {
1724 LOGP(
debug,
"-> {}", input.binding);
1731 for (
auto& service : driverServices) {
1732 if (service.adjustTopology ==
nullptr) {
1735 service.adjustTopology(node, *driverInfo.configContext);
1743 driverInfo.channelPolicies,
1744 driverInfo.completionPolicies,
1745 driverInfo.dispatchPolicies,
1746 driverInfo.resourcePolicies,
1747 driverInfo.callbacksPolicies,
1748 driverInfo.sendingPolicies,
1749 driverInfo.forwardingPolicies,
1750 runningWorkflow.devices,
1752 driverInfo.uniqueWorkflowId,
1753 *driverInfo.configContext,
1754 !varmap[
"no-IPC"].as<
bool>(),
1755 driverInfo.resourcesMonitoringInterval,
1756 varmap[
"channel-prefix"].as<std::string>(),
1758 metricProcessingCallbacks.clear();
1759 std::vector<std::string> matchingServices;
1762 matchingServices.clear();
1763 for (
auto& device : runningWorkflow.devices) {
1764 for (
auto& service : device.services) {
1766 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1769 if (service.metricHandling) {
1770 metricProcessingCallbacks.push_back(service.metricHandling);
1771 matchingServices.push_back(service.name);
1777 matchingServices.clear();
1778 for (
auto& device : runningWorkflow.devices) {
1779 for (
auto& service : device.services) {
1781 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1784 if (service.summaryHandling) {
1785 summaryCallbacks.push_back(service.summaryHandling);
1786 matchingServices.push_back(service.name);
1791 preScheduleCallbacks.clear();
1792 matchingServices.clear();
1793 for (
auto& device : runningWorkflow.devices) {
1794 for (
auto& service : device.services) {
1796 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1799 if (service.preSchedule) {
1800 preScheduleCallbacks.push_back(service.preSchedule);
1804 postScheduleCallbacks.clear();
1805 matchingServices.clear();
1806 for (
auto& device : runningWorkflow.devices) {
1807 for (
auto& service : device.services) {
1809 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1812 if (service.postSchedule) {
1813 postScheduleCallbacks.push_back(service.postSchedule);
1817 driverInitCallbacks.clear();
1818 matchingServices.clear();
1819 for (
auto& device : runningWorkflow.devices) {
1820 for (
auto& service : device.services) {
1822 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1825 if (service.driverInit) {
1826 driverInitCallbacks.push_back(service.driverInit);
1834 for (
auto& device : runningWorkflow.devices) {
1836 if (device.name.find(
"internal") != std::string::npos) {
1840 if (configStore !=
nullptr) {
1841 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1842 for (
auto& option : device.options) {
1843 const char*
name = option.name.c_str();
1844 switch (option.type) {
1845 case VariantType::Int:
1846 option.defaultValue = reg->get<int32_t>(
name);
1848 case VariantType::Int8:
1849 option.defaultValue = reg->get<int8_t>(
name);
1851 case VariantType::Int16:
1852 option.defaultValue = reg->get<int16_t>(
name);
1854 case VariantType::UInt8:
1855 option.defaultValue = reg->get<uint8_t>(
name);
1857 case VariantType::UInt16:
1858 option.defaultValue = reg->get<uint16_t>(
name);
1860 case VariantType::UInt32:
1861 option.defaultValue = reg->get<uint32_t>(
name);
1863 case VariantType::UInt64:
1864 option.defaultValue = reg->get<uint64_t>(
name);
1866 case VariantType::Int64:
1867 option.defaultValue = reg->get<int64_t>(
name);
1869 case VariantType::Float:
1870 option.defaultValue = reg->get<
float>(
name);
1872 case VariantType::Double:
1873 option.defaultValue = reg->get<
double>(
name);
1875 case VariantType::String:
1876 option.defaultValue = reg->get<std::string>(
name);
1878 case VariantType::Bool:
1879 option.defaultValue = reg->get<
bool>(
name);
1881 case VariantType::ArrayInt:
1882 option.defaultValue = reg->get<std::vector<int>>(
name);
1884 case VariantType::ArrayFloat:
1885 option.defaultValue = reg->get<std::vector<float>>(
name);
1887 case VariantType::ArrayDouble:
1888 option.defaultValue = reg->get<std::vector<double>>(
name);
1890 case VariantType::ArrayString:
1891 option.defaultValue = reg->get<std::vector<std::string>>(
name);
1893 case VariantType::Array2DInt:
1896 case VariantType::Array2DFloat:
1899 case VariantType::Array2DDouble:
1902 case VariantType::LabeledArrayInt:
1905 case VariantType::LabeledArrayFloat:
1908 case VariantType::LabeledArrayDouble:
1911 case VariantType::LabeledArrayString:
1920 }
catch (std::runtime_error& e) {
1921 LOGP(error,
"invalid workflow in {}: {}", driverInfo.argv[0], e.what());
1925#ifdef DPL_ENABLE_BACKTRACE
1928 LOGP(error,
"invalid workflow in {}: {}", driverInfo.argv[0], err.what);
1931 LOGP(error,
"invalid workflow in {}: Unknown error while materialising workflow", driverInfo.argv[0]);
1935 case DriverState::DO_CHILD:
1938 kill(getpid(), SIGSTOP);
1940 for (
size_t di = 0;
di < runningWorkflow.devices.size();
di++) {
1942 if (runningWorkflow.devices[
di].id == frameworkId) {
1943 return doChild(driverInfo.argc, driverInfo.argv,
1945 runningWorkflow,
ref,
1947 driverInfo.processingPolicies,
1948 driverInfo.defaultDriverClient,
1953 std::ostringstream ss;
1954 for (
auto& processor : workflow) {
1955 ss <<
" - " << processor.name <<
"\n";
1957 for (
auto& spec : runningWorkflow.devices) {
1958 ss <<
" - " << spec.name <<
"(" << spec.id <<
")"
1961 driverInfo.lastError = fmt::format(
1962 "Unable to find component with id {}."
1963 " Available options:\n{}",
1964 frameworkId, ss.str());
1965 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1968 case DriverState::REDEPLOY_GUI:
1978 if (!driverConfig.
batch || getenv(
"DPL_DRIVER_REMOTE_GUI")) {
1980 uv_timer_stop(gui_timer);
1983 auto callback = debugGUI->
getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl);
1984 guiContext.
callback = [&serviceRegistry, &driverServices, &debugGUI, &infos, &runningWorkflow, &dataProcessorInfos, &metricsInfos, &driverInfo, &controls, &driverControl, callback]() {
1986 for (
auto& service : driverServices) {
1987 if (service.postRenderGUI) {
1988 service.postRenderGUI(serviceRegistry);
1992 guiContext.
window = window;
1995 gui_timer->data = &guiContext;
1998 guiDeployedOnce =
true;
2001 case DriverState::MERGE_CONFIGS: {
2003 controls.resize(runningWorkflow.devices.size());
2006 if (varmap.count(
"dpl-tracing-flags")) {
2007 for (
auto& control : controls) {
2009 control.tracingFlags = tracingFlags;
2012 deviceExecutions.resize(runningWorkflow.devices.size());
2016 const auto uniformOptions = {
2018 "--aod-memory-rate-limit",
2019 "--aod-writer-json",
2020 "--aod-writer-ntfmerge",
2021 "--aod-writer-resdir",
2022 "--aod-writer-resfile",
2023 "--aod-writer-resmode",
2024 "--aod-writer-maxfilesize",
2025 "--aod-writer-keep",
2026 "--aod-max-io-rate",
2027 "--aod-parent-access-level",
2028 "--aod-parent-base-path-replacement",
2029 "--driver-client-backend",
2030 "--fairmq-ipc-prefix",
2032 "--resources-monitoring",
2033 "--resources-monitoring-dump-interval",
2037 for (
auto& option : uniformOptions) {
2044 driverInfo.processingPolicies.termination == TerminationPolicy::WAIT,
2048 runningWorkflow.devices,
2052 driverInfo.uniqueWorkflowId);
2055 LOGP(error,
"unable to merge configurations in {}: {}", driverInfo.argv[0], err.what);
2056#ifdef DPL_ENABLE_BACKTRACE
2057 std::cerr <<
"\nStacktrace follows:\n\n";
2063 case DriverState::SCHEDULE: {
2068 LOG(info) <<
"Redeployment of configuration asked.";
2069 std::ostringstream forwardedStdin;
2071 infos.reserve(runningWorkflow.devices.size());
2074 unsigned parentCPU = -1;
2075 unsigned parentNode = -1;
2076#if defined(__linux__) && __has_include(<sched.h>)
2077 parentCPU = sched_getcpu();
2078#elif __has_include(<linux/getcpu.h>)
2079 getcpu(&parentCPU, &parentNode,
nullptr);
2080#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
2085 for (
auto& callback : preScheduleCallbacks) {
2086 callback(serviceRegistry, {varmap});
2088 childFds.resize(runningWorkflow.devices.size());
2089 for (
int di = 0;
di < (
int)runningWorkflow.devices.size(); ++
di) {
2090 auto& context = childFds[
di];
2093 if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[
di].resource.hostname != driverInfo.deployHostname) {
2095 runningWorkflow.devices[
di], controls[
di], deviceExecutions[
di], infos, allStates);
2100 runningWorkflow.devices, driverInfo,
2101 controls, deviceExecutions, infos,
2103 serviceRegistry, varmap,
2104 childFds, parentCPU, parentNode);
2109 for (
auto& callback : postScheduleCallbacks) {
2110 callback(serviceRegistry, {varmap});
2112 assert(infos.empty() ==
false);
2117 uv_timer_init(loop, &metricDumpTimer);
2119 driverInfo.resourcesMonitoringDumpInterval * 1000,
2120 driverInfo.resourcesMonitoringDumpInterval * 1000);
2123 for (
const auto& processorInfo : dataProcessorInfos) {
2124 const auto& cmdLineArgs = processorInfo.cmdLineArgs;
2125 if (std::find(cmdLineArgs.begin(), cmdLineArgs.end(),
"--severity") != cmdLineArgs.end()) {
2126 for (
size_t counter = 0;
const auto& spec : runningWorkflow.devices) {
2127 if (spec.name.compare(processorInfo.name) == 0) {
2129 const auto logLevelIt = std::find(cmdLineArgs.begin(), cmdLineArgs.end(),
"--severity") + 1;
2130 if ((*logLevelIt).compare(
"debug") == 0) {
2131 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2132 }
else if ((*logLevelIt).compare(
"detail") == 0) {
2133 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2134 }
else if ((*logLevelIt).compare(
"info") == 0) {
2135 info.logLevel = LogParsingHelpers::LogLevel::Info;
2136 }
else if ((*logLevelIt).compare(
"warning") == 0) {
2137 info.logLevel = LogParsingHelpers::LogLevel::Warning;
2138 }
else if ((*logLevelIt).compare(
"error") == 0) {
2139 info.logLevel = LogParsingHelpers::LogLevel::Error;
2140 }
else if ((*logLevelIt).compare(
"important") == 0) {
2141 info.logLevel = LogParsingHelpers::LogLevel::Info;
2142 }
else if ((*logLevelIt).compare(
"alarm") == 0) {
2143 info.logLevel = LogParsingHelpers::LogLevel::Alarm;
2144 }
else if ((*logLevelIt).compare(
"fatal") == 0) {
2145 info.logLevel = LogParsingHelpers::LogLevel::Fatal;
2153 LOG(info) <<
"Redeployment of configuration done.";
2155 case DriverState::RUNNING:
2159 devicesManager->flush();
2163 if (guiTimerExpired ==
false) {
2164 O2_SIGNPOST_EVENT_EMIT(driver, sid,
"mainloop",
"Entering event loop with %{public}s", once ?
"UV_RUN_ONCE" :
"UV_RUN_NOWAIT");
2166 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2170 if (guiQuitRequested ||
2171 (driverInfo.processingPolicies.termination == TerminationPolicy::QUIT && (
checkIfCanExit(infos) ==
true))) {
2176 LOG(info) <<
"Quitting";
2177 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2178 }
else if (infos.size() != runningWorkflow.devices.size()) {
2183 driverInfo.states.push_back(DriverState::RUNNING);
2184 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2185 driverInfo.states.push_back(DriverState::SCHEDULE);
2186 driverInfo.states.push_back(DriverState::MERGE_CONFIGS);
2187 }
else if (runningWorkflow.devices.empty() && driverConfig.
batch ==
true) {
2188 LOG(info) <<
"No device resulting from the workflow. Quitting.";
2190 driverInfo.states.push_back(DriverState::EXIT);
2191 }
else if (runningWorkflow.devices.empty() && driverConfig.
batch ==
false && !guiDeployedOnce) {
2193 driverInfo.states.push_back(DriverState::RUNNING);
2194 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2196 driverInfo.states.push_back(DriverState::RUNNING);
2199 case DriverState::QUIT_REQUESTED:
2200 LOG(info) <<
"QUIT_REQUESTED";
2201 guiQuitRequested =
true;
2208 force_step_timer.data = &infos;
2210 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2212 case DriverState::HANDLE_CHILDREN: {
2216 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2221 static bool forcefulExitMessage =
true;
2222 if (forcefulExitMessage) {
2223 LOG(info) <<
"Forceful exit requested.";
2224 forcefulExitMessage =
false;
2230 driverInfo.sigchldRequested =
false;
2235 bool supposedToQuit = (guiQuitRequested || canExit ||
graceful_exit);
2237 if (allChildrenGone && (supposedToQuit || driverInfo.processingPolicies.termination == TerminationPolicy::QUIT)) {
2239 driverInfo.states.resize(0);
2240 driverInfo.states.push_back(DriverState::EXIT);
2241 }
else if (hasError && driverInfo.processingPolicies.error == TerminationPolicy::QUIT && !supposedToQuit) {
2243 force_exit_timer.data = &infos;
2244 static bool forceful_timer_started =
false;
2245 if (forceful_timer_started ==
false) {
2246 forceful_timer_started =
true;
2249 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2250 }
else if (allChildrenGone ==
false && supposedToQuit) {
2251 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2255 case DriverState::EXIT: {
2257 if (driverInfo.resourcesMonitoringDumpInterval) {
2258 uv_timer_stop(&metricDumpTimer);
2260 LOG(info) <<
"Dumping performance metrics to performanceMetrics.json file";
2263 dumpRunSummary(serverContext, driverInfo, infos, runningWorkflow.devices);
2270 if (infos.empty()) {
2273 boost::property_tree::ptree finalConfig;
2274 assert(infos.size() == runningWorkflow.devices.size());
2275 for (
size_t di = 0;
di < infos.size(); ++
di) {
2276 auto& info = infos[
di];
2277 auto& spec = runningWorkflow.devices[
di];
2278 finalConfig.put_child(spec.name, info.currentConfig);
2280 LOG(info) <<
"Dumping used configuration in dpl-config.json";
2282 std::ofstream outDPLConfigFile(
"dpl-config.json", std::ios::out);
2283 if (outDPLConfigFile.is_open()) {
2284 boost::property_tree::write_json(outDPLConfigFile, finalConfig);
2286 LOGP(warning,
"Could not write out final configuration file. Read only run folder?");
2288 if (driverInfo.noSHMCleanup) {
2289 LOGP(warning,
"Not cleaning up shared memory.");
2293 return calculateExitCode(driverInfo, runningWorkflow.devices, infos);
2295 case DriverState::PERFORM_CALLBACKS:
2296 for (
auto& callback : driverControl.
callbacks) {
2297 callback(workflow, runningWorkflow.devices, deviceExecutions, dataProcessorInfos, commandInfo);
2302 LOG(error) <<
"Driver transitioned in an unknown state("
2303 <<
"current: " << (
int)current
2304 <<
", previous: " << (
int)previous
2305 <<
"). Shutting down.";
2306 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2844 std::vector<ChannelConfigurationPolicy>
const& channelPolicies,
2845 std::vector<CompletionPolicy>
const& completionPolicies,
2846 std::vector<DispatchPolicy>
const& dispatchPolicies,
2847 std::vector<ResourcePolicy>
const& resourcePolicies,
2848 std::vector<CallbacksPolicy>
const& callbacksPolicies,
2849 std::vector<SendingPolicy>
const& sendingPolicies,
2850 std::vector<ConfigParamSpec>
const& currentWorkflowOptions,
2851 std::vector<ConfigParamSpec>
const& detectedParams,
2856 if (getenv(
"DPL_DRIVER_SIGNPOSTS")) {
2860 std::vector<std::string> currentArgs;
2861 std::vector<PluginInfo> plugins;
2864 for (
int ai = 1; ai < argc; ++ai) {
2865 currentArgs.emplace_back(argv[ai]);
2871 currentWorkflowOptions};
2875 bpo::options_description executorOptions(
"Executor options");
2876 const char* helpDescription =
"print help: short, full, executor, or processor name";
2878 executorOptions.add_options()
2879 (
"help,h", bpo::value<std::string>()->implicit_value(
"short"), helpDescription)
2880 (
"quiet,q", bpo::value<bool>()->zero_tokens()->default_value(
false),
"quiet operation")
2881 (
"stop,s", bpo::value<bool>()->zero_tokens()->default_value(
false),
"stop before device start")
2882 (
"single-step", bpo::value<bool>()->zero_tokens()->default_value(
false),
"start in single step mode")
2883 (
"batch,b", bpo::value<std::vector<std::string>>()->zero_tokens()->composing(),
"batch processing mode")
2884 (
"no-batch", bpo::value<bool>()->zero_tokens(),
"force gui processing mode")
2885 (
"no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(
false),
"do not cleanup the shm segment")
2886 (
"hostname", bpo::value<std::string>()->default_value(
"localhost"),
"hostname to deploy")
2887 (
"resources", bpo::value<std::string>()->default_value(
""),
"resources allocated for the workflow")
2888 (
"start-port,p", bpo::value<unsigned short>()->default_value(22000),
"start port to allocate")
2889 (
"port-range,pr", bpo::value<unsigned short>()->default_value(1000),
"ports in range")
2890 (
"completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.
termination)->default_value(TerminationPolicy::QUIT),
2891 "what to do when processing is finished: quit, wait")
2892 (
"error-policy", bpo::value<TerminationPolicy>(&processingPolicies.
error)->default_value(TerminationPolicy::QUIT),
2893 "what to do when a device has an error: quit, wait")
2894 (
"min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal),
2895 "minimum message level which will be considered as fatal and exit with 1")
2896 (
"graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(
false),
"produce graphviz output")
2897 (
"mermaid", bpo::value<std::string>()->default_value(
""),
"produce graph output in mermaid format in file under specified name or on stdout if argument is \"-\"")
2898 (
"timeout,t", bpo::value<uint64_t>()->default_value(0),
"forced exit timeout (in seconds)")
2899 (
"dds,D", bpo::value<std::string>()->default_value(
""),
"create DDS configuration")
2900 (
"dds-workflow-suffix,D", bpo::value<std::string>()->default_value(
""),
"suffix for DDS names")
2901 (
"dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(
false),
"dump workflow as JSON")
2902 (
"dump-workflow-file", bpo::value<std::string>()->default_value(
"-"),
"file to which do the dump")
2903 (
"driver-mode", bpo::value<DriverMode>(&driverMode)->default_value(DriverMode::STANDALONE), R
"(how to run the driver. default: "standalone". Valid: "embedded")")
2904 (
"run", bpo::value<bool>()->zero_tokens()->default_value(
false),
"run workflow merged so far. It implies --batch. Use --no-batch to see the GUI")
2905 (
"no-IPC", bpo::value<bool>()->zero_tokens()->default_value(
false),
"disable IPC topology optimization")
2906 (
"o2-control,o2", bpo::value<std::string>()->default_value(
""),
"dump O2 Control workflow configuration under the specified name")
2907 (
"resources-monitoring", bpo::value<unsigned short>()->default_value(0),
"enable cpu/memory monitoring for provided interval in seconds")
2908 (
"resources-monitoring-dump-interval", bpo::value<unsigned short>()->default_value(0),
"dump monitoring information to disk every provided seconds");
2913 (
"id,i", bpo::value<std::string>(),
"device id for child spawning")
2914 (
"channel-config", bpo::value<std::vector<std::string>>(),
"channel configuration")
2915 (
"control",
"control plugin")
2916 (
"log-color",
"logging color scheme")(
"color",
"logging color scheme");
2918 bpo::options_description visibleOptions;
2919 visibleOptions.add(executorOptions);
2921 auto physicalWorkflow = workflow;
2922 std::map<std::string, size_t> rankIndex;
2929 size_t workflowHashA = 0;
2930 std::hash<std::string> hash_fn;
2932 for (
auto& dp : workflow) {
2933 workflowHashA += hash_fn(dp.name);
2936 for (
auto& dp : workflow) {
2937 rankIndex.insert(std::make_pair(dp.name, workflowHashA));
2940 std::vector<DataProcessorInfo> dataProcessorInfos;
2944 std::vector<DataProcessorSpec> importedWorkflow;
2946 if (previousWorked ==
false) {
2950 size_t workflowHashB = 0;
2951 for (
auto& dp : importedWorkflow) {
2952 workflowHashB += hash_fn(dp.name);
2959 for (
auto& dp : importedWorkflow) {
2960 auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(),
2962 if (found == physicalWorkflow.end()) {
2963 physicalWorkflow.push_back(dp);
2964 rankIndex.insert(std::make_pair(dp.name, workflowHashB));
2973 for (
auto& dp : physicalWorkflow) {
2975 if (std::find_if(dp.labels.begin(), dp.labels.end(), isExpendable) != dp.labels.end()) {
2976 for (
auto&
output : dp.outputs) {
2977 if (
output.lifetime == Lifetime::Timeframe) {
2978 output.lifetime = Lifetime::Sporadic;
2989 auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](
DataProcessorSpec& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
2990 if (reader != physicalWorkflow.end()) {
2993 for (
auto& service : driverServices) {
2994 if (service.injectTopology ==
nullptr) {
2998 service.injectTopology(node, configContext);
3000 for (
auto& dp : physicalWorkflow) {
3001 if (dp.name.rfind(
"internal-", 0) == 0) {
3002 rankIndex.insert(std::make_pair(dp.name, hash_fn(
"internal")));
3009 return a.name < b.name;
3012 for (
auto& dp : physicalWorkflow) {
3013 std::stable_sort(dp.inputs.begin(), dp.inputs.end(),
3014 [](
InputSpec const&
a,
InputSpec const&
b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3015 std::stable_sort(dp.outputs.begin(), dp.outputs.end(),
3016 [](
OutputSpec const&
a,
OutputSpec const&
b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3020 std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
3021 dependencyCheckers.reserve(physicalWorkflow.size());
3023 for (
auto& spec : physicalWorkflow) {
3024 for (
auto& policy : topologyPolicies) {
3025 if (policy.matcher(spec)) {
3026 dependencyCheckers.push_back(policy.checkDependency);
3031 assert(dependencyCheckers.size() == physicalWorkflow.size());
3033 auto checkDependencies = [&workflow = physicalWorkflow,
3034 &dependencyCheckers](
int i,
int j) {
3036 return checker(workflow[
i], workflow[
j]);
3041 std::vector<std::pair<int, int>> edges;
3043 if (physicalWorkflow.size() > 1) {
3044 for (
size_t i = 0;
i < physicalWorkflow.size() - 1; ++
i) {
3045 for (
size_t j =
i;
j < physicalWorkflow.size(); ++
j) {
3046 if (
i ==
j && checkDependencies(
i,
j)) {
3047 throw std::runtime_error(physicalWorkflow[
i].
name +
" depends on itself");
3050 if (checkDependencies(
i,
j)) {
3051 edges.emplace_back(
j,
i);
3054 if (checkDependencies(
j,
i)) {
3055 edges.emplace_back(
i,
j);
3057 std::ostringstream
str;
3058 for (
auto x : {
i,
j}) {
3059 str << physicalWorkflow[
x].name <<
":\n";
3061 for (
auto& input : physicalWorkflow[
x].inputs) {
3062 str <<
"- " << input <<
"\n";
3064 str <<
"outputs:\n";
3065 for (
auto&
output : physicalWorkflow[
x].outputs) {
3069 throw std::runtime_error(physicalWorkflow[
i].
name +
" has circular dependency with " + physicalWorkflow[
j].
name +
":\n" +
str.str());
3076 if (topoInfos.size() != physicalWorkflow.size()) {
3079 throw std::runtime_error(
"Unable to do topological sort of the resulting workflow. Do you have loops?\n" +
debugTopoInfo(physicalWorkflow, topoInfos, edges));
3083 auto aRank = std::make_tuple(a.layer, -workflow.at(a.index).outputs.size(), workflow.at(a.index).name);
3084 auto bRank = std::make_tuple(b.layer, -workflow.at(b.index).outputs.size(), workflow.at(b.index).name);
3085 return aRank < bRank;
3088 std::vector<int> dataProcessorOrder;
3089 dataProcessorOrder.resize(topoInfos.size());
3090 for (
size_t i = 0;
i < topoInfos.size(); ++
i) {
3091 dataProcessorOrder[topoInfos[
i].index] =
i;
3093 std::vector<int> newLocations;
3094 newLocations.resize(dataProcessorOrder.size());
3095 for (
size_t i = 0;
i < dataProcessorOrder.size(); ++
i) {
3096 newLocations[dataProcessorOrder[
i]] =
i;
3106 bpo::options_description od;
3107 od.add(visibleOptions);
3112 using namespace bpo::command_line_style;
3113 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
3114 bpo::variables_map varmap;
3117 bpo::command_line_parser(argc, argv)
3122 }
catch (std::exception
const& e) {
3123 LOGP(error,
"error parsing options of {}: {}", argv[0], e.what());
3139 if (varmap.count(
"help")) {
3140 printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions);
3146 if (varmap.count(
"severity")) {
3147 auto logLevel = varmap[
"severity"].as<std::string>();
3148 if (logLevel ==
"debug") {
3149 fair::Logger::SetConsoleSeverity(fair::Severity::debug);
3150 }
else if (logLevel ==
"detail") {
3151 fair::Logger::SetConsoleSeverity(fair::Severity::detail);
3152 }
else if (logLevel ==
"info") {
3153 fair::Logger::SetConsoleSeverity(fair::Severity::info);
3154 }
else if (logLevel ==
"warning") {
3155 fair::Logger::SetConsoleSeverity(fair::Severity::warning);
3156 }
else if (logLevel ==
"error") {
3157 fair::Logger::SetConsoleSeverity(fair::Severity::error);
3158 }
else if (logLevel ==
"important") {
3159 fair::Logger::SetConsoleSeverity(fair::Severity::important);
3160 }
else if (logLevel ==
"alarm") {
3161 fair::Logger::SetConsoleSeverity(fair::Severity::alarm);
3162 }
else if (logLevel ==
"fatal") {
3163 fair::Logger::SetConsoleSeverity(fair::Severity::fatal);
3165 LOGP(error,
"Invalid log level '{}'", logLevel);
3172 auto evaluateBatchOption = [&varmap]() ->
bool {
3173 if (varmap.count(
"no-batch") > 0) {
3176 if (varmap.count(
"batch") == 0) {
3178 return isatty(fileno(stdout)) == 0;
3187 DriverInfo driverInfo{
3188 .sendingPolicies = sendingPolicies,
3189 .forwardingPolicies = forwardingPolicies,
3190 .callbacksPolicies = callbacksPolicies};
3191 driverInfo.states.reserve(10);
3192 driverInfo.sigintRequested =
false;
3193 driverInfo.sigchldRequested =
false;
3194 driverInfo.channelPolicies = channelPolicies;
3195 driverInfo.completionPolicies = completionPolicies;
3196 driverInfo.dispatchPolicies = dispatchPolicies;
3197 driverInfo.resourcePolicies = resourcePolicies;
3198 driverInfo.argc = argc;
3199 driverInfo.argv = argv;
3200 driverInfo.noSHMCleanup = varmap[
"no-cleanup"].as<
bool>();
3201 driverInfo.processingPolicies.termination = varmap[
"completion-policy"].as<
TerminationPolicy>();
3202 driverInfo.processingPolicies.earlyForward = varmap[
"early-forward-policy"].as<
EarlyForwardPolicy>();
3203 driverInfo.mode = varmap[
"driver-mode"].as<
DriverMode>();
3205 auto batch = evaluateBatchOption();
3208 .driverHasGUI = (batch ==
false) || getenv(
"DPL_DRIVER_REMOTE_GUI") !=
nullptr,
3211 if (varmap[
"error-policy"].defaulted() && driverConfig.batch ==
false) {
3212 driverInfo.processingPolicies.error = TerminationPolicy::WAIT;
3214 driverInfo.processingPolicies.error = varmap[
"error-policy"].as<
TerminationPolicy>();
3217 driverInfo.startTime = uv_hrtime();
3218 driverInfo.startTimeMsFromEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
3219 std::chrono::system_clock::now().time_since_epoch())
3221 driverInfo.timeout = varmap[
"timeout"].as<uint64_t>();
3222 driverInfo.deployHostname = varmap[
"hostname"].as<std::string>();
3223 driverInfo.resources = varmap[
"resources"].as<std::string>();
3224 driverInfo.resourcesMonitoringInterval = varmap[
"resources-monitoring"].as<
unsigned short>();
3225 driverInfo.resourcesMonitoringDumpInterval = varmap[
"resources-monitoring-dump-interval"].as<
unsigned short>();
3228 driverInfo.processorInfo = dataProcessorInfos;
3229 driverInfo.configContext = &configContext;
3236 std::string frameworkId;
3239 if (varmap.count(
"id")) {
3243 frameworkId = std::regex_replace(varmap[
"id"].as<std::string>(), std::regex{
"_dds.*"},
"");
3244 driverInfo.uniqueWorkflowId = fmt::format(
"{}", getppid());
3245 driverInfo.defaultDriverClient =
"stdout://";
3247 driverInfo.uniqueWorkflowId = fmt::format(
"{}", getpid());
3248 driverInfo.defaultDriverClient =
"ws://";