runDataProcessing.cxx File Reference
#include <memory>
#include <stdexcept>
#include "Framework/BoostOptionsRetriever.h"
#include "Framework/BacktraceHelpers.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include "Framework/ChannelMatching.h"
#include "Framework/ConfigParamsHelper.h"
#include "Framework/ConfigParamSpec.h"
#include "Framework/ConfigContext.h"
#include "Framework/ComputingQuotaEvaluator.h"
#include "CommonDriverServices.h"
#include "Framework/DataProcessingDevice.h"
#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/PluginManager.h"
#include "Framework/DeviceControl.h"
#include "Framework/DeviceExecution.h"
#include "Framework/DeviceInfo.h"
#include "Framework/DeviceMetricsInfo.h"
#include "Framework/DeviceMetricsHelper.h"
#include "Framework/DeviceConfigInfo.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DeviceState.h"
#include "Framework/DeviceConfig.h"
#include "DeviceStateHelpers.h"
#include "Framework/ServiceRegistryHelpers.h"
#include "Framework/DevicesManager.h"
#include "Framework/DebugGUI.h"
#include "Framework/LocalRootFileService.h"
#include "Framework/LogParsingHelpers.h"
#include "Framework/Logger.h"
#include "Framework/ParallelContext.h"
#include "Framework/RawDeviceService.h"
#include "Framework/SimpleRawDeviceService.h"
#include "Framework/Signpost.h"
#include "Framework/ControlService.h"
#include "Framework/CallbackService.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/Monitoring.h"
#include "Framework/DataProcessorInfo.h"
#include "Framework/DriverInfo.h"
#include "Framework/DriverConfig.h"
#include "Framework/DriverControl.h"
#include "Framework/DataTakingContext.h"
#include "Framework/CommandInfo.h"
#include "Framework/RunningWorkflowInfo.h"
#include "Framework/TopologyPolicy.h"
#include "Framework/WorkflowSpecNode.h"
#include "Framework/GuiCallbackContext.h"
#include "Framework/DeviceContext.h"
#include "Framework/ServiceMetricsInfo.h"
#include "Framework/CommonServices.h"
#include "Framework/DefaultsHelpers.h"
#include "ProcessingPoliciesHelpers.h"
#include "DriverServerContext.h"
#include "HTTPParser.h"
#include "DPLWebSocket.h"
#include "ArrowSupport.h"
#include "Framework/ConfigParamDiscovery.h"
#include "ComputingResourceHelpers.h"
#include "DataProcessingStatus.h"
#include "DDSConfigHelpers.h"
#include "O2ControlHelpers.h"
#include "DeviceSpecHelpers.h"
#include "GraphvizHelpers.h"
#include "MermaidHelpers.h"
#include "PropertyTreeHelpers.h"
#include "SimpleResourceManager.h"
#include "WorkflowSerializationHelpers.h"
#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <Monitoring/MonitoringFactory.h>
#include "ResourcesMonitoringHelper.h"
#include <fairmq/Device.h>
#include <fairmq/DeviceRunner.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/ProgOptions.h>
#include <boost/program_options.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <uv.h>
#include <TEnv.h>
#include <TSystem.h>
#include <cinttypes>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <csignal>
#include <iostream>
#include <map>
#include <regex>
#include <set>
#include <string>
#include <type_traits>
#include <tuple>
#include <chrono>
#include <utility>
#include <numeric>
#include <functional>
#include <fcntl.h>
#include <netinet/ip.h>
#include <sys/resource.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
#include <execinfo.h>
#include <cfenv>

struct  DeviceLogContext
struct  StreamConfigContext
struct  DeviceRef
struct  DeviceStdioContext
struct  WorkflowInfo




using DataProcessorInfos = std::vector< DataProcessorInfo >
using DeviceExecutions = std::vector< DeviceExecution >
using DeviceSpecs = std::vector< DeviceSpec >
using DeviceInfos = std::vector< DeviceInfo >
using DataProcessingStatesInfos = std::vector< DataProcessingStates >
using DeviceControls = std::vector< DeviceControl >
using DataProcessorSpecs = std::vector< DataProcessorSpec >


bpo::options_description gHiddenDeviceOptions ("Hidden child options")
void doBoostException (boost::exception &e, const char *)
void doDPLException (o2::framework::RuntimeErrorRef &ref, char const *)
void doUnknownException (std::string const &s, char const *)
chargetIdString (int argc, char **argv)
int callMain (int argc, char **argv, int(*mainNoCatch)(int, char **))
void getChildData (int infd, DeviceInfo &outinfo)
bool checkIfCanExit (std::vector< DeviceInfo > const &infos)
void killChildren (std::vector< DeviceInfo > &infos, int sig)
bool areAllChildrenGone (std::vector< DeviceInfo > &infos)
 Check the state of the children.
void createPipes (int *pipes)
void cleanupSHM (std::string const &uniqueWorkflowId)
 Helper to invoke shared memory cleanup.
void spawnRemoteDevice (uv_loop_t *loop, std::string const &, DeviceSpec const &spec, DeviceControl &, DeviceExecution &, DeviceInfos &deviceInfos, DataProcessingStatesInfos &allStates)
void log_callback (uv_poll_t *handle, int status, int events)
void close_websocket (uv_handle_t *handle)
void websocket_callback (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
void ws_connect_callback (uv_stream_t *server, int status)
 A callback for the rest engine.
void stream_config (uv_work_t *req)
void handleSignals ()
void handleChildrenStdio (DriverServerContext *serverContext, std::string const &forwardedStdin, std::vector< DeviceStdioContext > &childFds, std::vector< uv_poll_t * > &handles)
void handle_crash (int sig)
void spawnDevice (uv_loop_t *loop, DeviceRef ref, std::vector< DeviceSpec > const &specs, DriverInfo &driverInfo, std::vector< DeviceControl > &, std::vector< DeviceExecution > &executions, std::vector< DeviceInfo > &deviceInfos, std::vector< DataProcessingStates > &allStates, ServiceRegistryRef serviceRegistry, boost::program_options::variables_map &varmap, std::vector< DeviceStdioContext > &childFds, unsigned parentCPU, unsigned parentNode)
void processChildrenOutput (uv_loop_t *loop, DriverInfo &driverInfo, DeviceInfos &infos, DeviceSpecs const &specs, DeviceControls &controls)
bool processSigChild (DeviceInfos &infos, DeviceSpecs &specs)
AlgorithmSpec dryRun (DeviceSpec const &spec)
void doDefaultWorkflowTerminationHook ()
int doChild (int argc, char **argv, ServiceRegistry &serviceRegistry, RunningWorkflowInfo const &runningWorkflow, RunningDeviceRef ref, DriverConfig const &driverConfig, ProcessingPolicies processingPolicies, std::string const &defaultDriverClient, uv_loop_t *loop)
void gui_callback (uv_timer_s *ctx)
void single_step_callback (uv_timer_s *ctx)
 Force single stepping of the children.
void force_exit_callback (uv_timer_s *ctx)
std::vector< std::regex > getDumpableMetrics ()
void dumpMetricsCallback (uv_timer_t *handle)
void dumpRunSummary (DriverServerContext &context, DriverInfo const &driverInfo, DeviceInfos const &infos, DeviceSpecs const &specs)
int runStateMachine (DataProcessorSpecs const &workflow, WorkflowInfo const &workflowInfo, DataProcessorInfos const &previousDataProcessorInfos, CommandInfo const &commandInfo, DriverControl &driverControl, DriverInfo &driverInfo, DriverConfig &driverConfig, std::vector< DeviceMetricsInfo > &metricsInfos, std::vector< ConfigParamSpec > const &detectedParams, boost::program_options::variables_map &varmap, std::vector< ServiceSpec > &driverServices, std::string frameworkId)
void printHelp (bpo::variables_map const &varmap, bpo::options_description const &executorOptions, std::vector< DataProcessorSpec > const &physicalWorkflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions)
bool isOutputToPipe ()
bool isInputConfig ()
void overrideCloning (ConfigContext &ctx, WorkflowSpec &workflow)
void overridePipeline (ConfigContext &ctx, WorkflowSpec &workflow)
void overrideLabels (ConfigContext &ctx, WorkflowSpec &workflow)
void initialiseDriverControl (bpo::variables_map const &varmap, DriverInfo &driverInfo, DriverControl &control)
 Helper function to initialise the controller from the command line options.
void conflicting_options (const boost::program_options::variables_map &vm, const std::string &opt1, const std::string &opt2)
 Helper to to detect conflicting options.
template<typename T >
void apply_permutation (std::vector< T > &v, std::vector< int > &indices)
void checkNonResiliency (std::vector< DataProcessorSpec > const &specs, std::vector< std::pair< int, int > > const &edges)
std::string debugTopoInfo (std::vector< DataProcessorSpec > const &specs, std::vector< TopoIndexInfo > const &infos, std::vector< std::pair< int, int > > const &edges)
void enableSignposts (std::string const &signpostsToEnable)
void overrideAll (o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
o2::framework::ConfigContext createConfigContext (std::unique_ptr< ConfigParamRegistry > &workflowOptionsRegistry, o2::framework::ServiceRegistry &configRegistry, std::vector< o2::framework::ConfigParamSpec > &workflowOptions, std::vector< o2::framework::ConfigParamSpec > &extraOptions, int argc, char **argv)
std::unique_ptr< o2::framework::ServiceRegistrycreateRegistry ()
int doMain (int argc, char **argv, o2::framework::WorkflowSpec const &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ConfigParamSpec > const &currentWorkflowOptions, std::vector< ConfigParamSpec > const &detectedParams, o2::framework::ConfigContext &configContext)


std::vector< DeviceMetricsInfogDeviceMetricsInfos
volatile sig_atomic_t graceful_exit = false
volatile sig_atomic_t forceful_exit = false
volatile sig_atomic_t sigchld_requested = false
volatile sig_atomic_t double_sigint = false
auto bindGUIPort

Macro Definition Documentation



Typedef Documentation

◆ DataProcessingStatesInfos

◆ DataProcessorInfos

using DataProcessorInfos = std::vector<DataProcessorInfo>

◆ DataProcessorSpecs

using DataProcessorSpecs = std::vector<DataProcessorSpec>

◆ DeviceControls

using DeviceControls = std::vector<DeviceControl>

◆ DeviceExecutions

using DeviceExecutions = std::vector<DeviceExecution>

◆ DeviceInfos

using DeviceInfos = std::vector<DeviceInfo>

◆ DeviceSpecs

using DeviceSpecs = std::vector<DeviceSpec>

Function Documentation

◆ apply_permutation()

template<typename T >
void apply_permutation ( std::vector< T > &  v,
std::vector< int > &  indices 

◆ areAllChildrenGone()

bool areAllChildrenGone ( std::vector< DeviceInfo > &  infos)

Check the state of the children.

◆ callMain()

int callMain ( int  argc,
char **  argv,
int(*)(int, char **)  mainNoCatch 

◆ checkIfCanExit()

bool checkIfCanExit ( std::vector< DeviceInfo > const &  infos)

Return true if all the DeviceInfo in infos are ready to quit. false otherwise. FIXME: move to an helper class

◆ checkNonResiliency()

void checkNonResiliency ( std::vector< DataProcessorSpec > const &  specs,
std::vector< std::pair< int, int > > const &  edges 

◆ cleanupSHM()

void cleanupSHM ( std::string const &  uniqueWorkflowId)

Helper to invoke shared memory cleanup.

◆ close_websocket()

void close_websocket ( uv_handle_t handle)

◆ conflicting_options()

void conflicting_options ( const boost::program_options::variables_map &  vm,
const std::string opt1,
const std::string opt2 

Helper to to detect conflicting options.

◆ createConfigContext()

o2::framework::ConfigContext createConfigContext ( std::unique_ptr< ConfigParamRegistry > &  workflowOptionsRegistry,
o2::framework::ServiceRegistry configRegistry,
std::vector< o2::framework::ConfigParamSpec > &  workflowOptions,
std::vector< o2::framework::ConfigParamSpec > &  extraOptions,
int  argc,
char **  argv 

◆ createPipes()

void createPipes ( int pipes)

◆ createRegistry()

std::unique_ptr< o2::framework::ServiceRegistry > createRegistry ( )

◆ debugTopoInfo()

std::string debugTopoInfo ( std::vector< DataProcessorSpec > const &  specs,
std::vector< TopoIndexInfo > const &  infos,
std::vector< std::pair< int, int > > const &  edges 

◆ doBoostException()

void doBoostException ( boost::exception &  e,
const char processName 

◆ doChild()

int doChild ( int  argc,
char **  argv,
ServiceRegistry serviceRegistry,
RunningWorkflowInfo const &  runningWorkflow,
RunningDeviceRef  ref,
DriverConfig const &  driverConfig,
ProcessingPolicies  processingPolicies,
std::string const &  defaultDriverClient,
uv_loop_t loop 

Create all the requested services and initialise them

◆ doDefaultWorkflowTerminationHook()

void doDefaultWorkflowTerminationHook ( )

◆ doDPLException()

void doDPLException ( o2::framework::RuntimeErrorRef ref,
char const *  processName 

◆ doMain()

int doMain ( int  argc,
char **  argv,
o2::framework::WorkflowSpec const &  workflow,
std::vector< ChannelConfigurationPolicy > const &  channelPolicies,
std::vector< CompletionPolicy > const &  completionPolicies,
std::vector< DispatchPolicy > const &  dispatchPolicies,
std::vector< ResourcePolicy > const &  resourcePolicies,
std::vector< CallbacksPolicy > const &  callbacksPolicies,
std::vector< SendingPolicy > const &  sendingPolicies,
std::vector< ConfigParamSpec > const &  currentWorkflowOptions,
std::vector< ConfigParamSpec > const &  detectedParams,
o2::framework::ConfigContext configContext 

Iterate over the physicalWorkflow, any DataProcessorSpec that has a expendable label should have all the timeframe lifetime outputs changed to sporadic, because there is no guarantee that the device will be alive, so we should not expect its data to always arrive.

This is the earlies the services are actually needed

Set the fair::Logger severity to the one specified in the command line We do it by hand here, because FairMQ device is not initialsed until much later and we need the logger before that.

◆ doUnknownException()

void doUnknownException ( std::string const &  s,
char const *  processName 

◆ dryRun()

AlgorithmSpec dryRun ( DeviceSpec const &  spec)

◆ dumpMetricsCallback()

void dumpMetricsCallback ( uv_timer_t handle)

◆ dumpRunSummary()

void dumpRunSummary ( DriverServerContext context,
DriverInfo const &  driverInfo,
DeviceInfos const &  infos,
DeviceSpecs const &  specs 

◆ enableSignposts()

void enableSignposts ( std::string const &  signpostsToEnable)

◆ force_exit_callback()

void force_exit_callback ( uv_timer_s *  ctx)

◆ getChildData()

void getChildData ( int  infd,
DeviceInfo outinfo 

◆ getDumpableMetrics()

std::vector< std::regex > getDumpableMetrics ( )

◆ getIdString()

char * getIdString ( int  argc,
char **  argv 

◆ gHiddenDeviceOptions()

bpo::options_description gHiddenDeviceOptions ( "Hidden child options"  )

◆ gui_callback()

void gui_callback ( uv_timer_s *  ctx)

◆ handle_crash()

void handle_crash ( int  sig)

◆ handleChildrenStdio()

void handleChildrenStdio ( DriverServerContext serverContext,
std::string const &  forwardedStdin,
std::vector< DeviceStdioContext > &  childFds,
std::vector< uv_poll_t * > &  handles 

Add pollers for stdout and stderr

◆ handleSignals()

void handleSignals ( )

◆ initialiseDriverControl()

void initialiseDriverControl ( bpo::variables_map const &  varmap,
DriverInfo &  driverInfo,
DriverControl control 

Helper function to initialise the controller from the command line options.

◆ isInputConfig()

bool isInputConfig ( )

◆ isOutputToPipe()

bool isOutputToPipe ( )

◆ killChildren()

void killChildren ( std::vector< DeviceInfo > &  infos,
int  sig 

◆ log_callback()

void log_callback ( uv_poll_t handle,
int  status,
int  events 

◆ overrideAll()

void overrideAll ( o2::framework::ConfigContext ctx,
std::vector< o2::framework::DataProcessorSpec > &  workflow 

◆ overrideCloning()

void overrideCloning ( ConfigContext ctx,
WorkflowSpec workflow 

◆ overrideLabels()

void overrideLabels ( ConfigContext ctx,
WorkflowSpec workflow 

◆ overridePipeline()

void overridePipeline ( ConfigContext ctx,
WorkflowSpec workflow 

◆ printHelp()

void printHelp ( bpo::variables_map const &  varmap,
bpo::options_description const &  executorOptions,
std::vector< DataProcessorSpec > const &  physicalWorkflow,
std::vector< ConfigParamSpec > const &  currentWorkflowOptions 

◆ processChildrenOutput()

void processChildrenOutput ( uv_loop_t loop,
DriverInfo &  driverInfo,
DeviceInfos infos,
DeviceSpecs const &  specs,
DeviceControls controls 

◆ processSigChild()

bool processSigChild ( DeviceInfos infos,
DeviceSpecs specs 

◆ runStateMachine()

int runStateMachine ( DataProcessorSpecs const &  workflow,
WorkflowInfo const &  workflowInfo,
DataProcessorInfos const &  previousDataProcessorInfos,
CommandInfo const &  commandInfo,
DriverControl driverControl,
DriverInfo &  driverInfo,
DriverConfig driverConfig,
std::vector< DeviceMetricsInfo > &  metricsInfos,
std::vector< ConfigParamSpec > const &  detectedParams,
boost::program_options::variables_map &  varmap,
std::vector< ServiceSpec > &  driverServices,
std::string  frameworkId 

Cleanup the shared memory for the uniqueWorkflowId, in case we are unlucky and an old one is already present.

After INIT we go into RUNNING and eventually to SCHEDULE from there and back into running. This is because the general case would be that we start an application and then we wait for resource offers from DDS or whatever resource manager we use.

extract and apply process switches prune device inputs

FIXME: use commandline arguments as alternative

Set the default value for tracingFlags of each control to the command line value –dpl-tracing-flags

Set the value for the severity of displayed logs to the command line value –severity

◆ single_step_callback()

void single_step_callback ( uv_timer_s *  ctx)

Force single stepping of the children.

◆ spawnDevice()

void spawnDevice ( uv_loop_t loop,
DeviceRef  ref,
std::vector< DeviceSpec > const &  specs,
DriverInfo &  driverInfo,
std::vector< DeviceControl > &  ,
std::vector< DeviceExecution > &  executions,
std::vector< DeviceInfo > &  deviceInfos,
std::vector< DataProcessingStates > &  allStates,
ServiceRegistryRef  serviceRegistry,
boost::program_options::variables_map &  varmap,
std::vector< DeviceStdioContext > &  childFds,
unsigned  parentCPU,
unsigned  parentNode 

This will start a new device by forking and executing a new child

◆ spawnRemoteDevice()

void spawnRemoteDevice ( uv_loop_t loop,
std::string const &  ,
DeviceSpec const &  spec,
DeviceControl ,
DeviceExecution ,
DeviceInfos deviceInfos,
DataProcessingStatesInfos allStates 

◆ stream_config()

void stream_config ( uv_work_t *  req)

◆ websocket_callback()

void websocket_callback ( uv_stream_t *  stream,
ssize_t  nread,
const uv_buf_t *  buf 

◆ ws_connect_callback()

void ws_connect_callback ( uv_stream_t *  server,
int  status 

A callback for the rest engine.

Variable Documentation

◆ bindGUIPort

auto bindGUIPort

◆ double_sigint

volatile sig_atomic_t double_sigint = false

◆ forceful_exit

volatile sig_atomic_t forceful_exit = false

◆ gDeviceMetricsInfos

std::vector<DeviceMetricsInfo> gDeviceMetricsInfos

◆ graceful_exit

volatile sig_atomic_t graceful_exit = false

◆ sigchld_requested

volatile sig_atomic_t sigchld_requested = false

