![]() |
Project
|
Core COOKBOOK Core COOKBOOK Core ANALYSIS Core ANALYSIS Core PROFILING Core PROFILING
FairMQ currently provides a well documented and flexible framework for an actor based computation where each of the actors listens for message-like entities on channels and executes some code as a reaction. The key component which controls this is called a fair::mq::Device
(or device from now on) which can use different kind of transports to receive and send messages. In the most generic case, users are allowed to have full control on the state machine governing the message passing and have complete control on how the message handling is done. This of course covers all ALICE usecases in a generic manner, at the cost of extra complexity left to the user to implement. In most cases however a simplified way of creating devices is provided, and the user will simply create its own fair::m::Device
-derived class, which registers via the OnData(fair::mq::Parts &parts)
method a callback that is invoked whenever a new message arrives. This however still holds the user responsible for:
OnData
callback. In particular there is no way to inspect which data is expected by a device and which data is produced.This is by design, because the FairMQ transport layer should not know anything about the actual data being transferred, while all the points above require some sort of inner knowledge about the data model and the data being moved around.
The aim is to achieve the following:
For the reasons mentioned above, we propose that the one of the developments which should happen with the O2 Framework work package is the development of a “Data Processing layer” which actually knows about the O2 Data Model (and for this reason cannot be part of FairMQ itself) and exploits it to validate, optimise and correctly schedule a computation on a user specified set of inputs.
The Data Processing Layer in particular requires:
and given these premises it actually guarantees:
CompletionPolicy
is executed to decide wether the associate record is complete. By default such a CompletionPolicy
waits for all the specified parts to have arrived.The description of the computation in such the Data Processing Layer is done via instances of the DataProcessorSpec
class, grouped in a so called WorkflowSpec
instance. In order to provide a description a computation to be run, the user must implement a callback which return an filled WorkflowSpec
. E.g.:
See next section, for a more detailed description of the DataProcessorSpec
class. The code above has to be linked into a single executable together with the Data Processing Layer code to form a so called driver executable which if run will:
DataProcessorSpec
to a set of fair::mq::Device
s (using 1-1 correspondence, in the current implementation).The ConfigContext
object being passed to the function contains a set of user provided options to customise the creation of the workflow. For example you might want to change the number of workers for a given task or disable part of the topology if a given detector should not be enabled.
In order to specify which options are to be present in the ConfigContext
, the user can define the extension point:
The description of the computation in such a layer is done via a DataProcessorSpec
class, which describes some sort of processing of a (set of) O2 Data Payloads (payloads from now on), as defined by the O2 Data Model, eventually producing new payloads as outputs. The inputs to the computation, the outputs and the actual code to run on the former to produce the latter, is specified in a DataProcessorSpec
instance. Multiple DataProcessorSpec
instances can be grouped together in a WorkflowSpec
to the driver code which maps them to processing devices accordingly.
The DataProcessorSpec
is defined as follows:
the inputs field represents a set of subscriptions to some kind of input data we would like to process. As per O2 Data Model there is 3 major properties O2 attaches to a message: its origin, a description and a generic subspecification.
So:
{InputSpec{"clusters", "TPC", "CLUSTERS", 0}}
really means "subscribe in input to the messages which have origin "TPC", contain objects of kind "CLUSTERS" and have a generic subspecification 0 (e.g. to index the sector). In your code you will be able to retrieve these kind of messages using the "clusters" label (see later the description of the InputRecord API).An InputSpec is effectively a three dimensional selection on space defined by all the possible origins, descriptions and subspecifications. Notice that the method <tt>o2::framework::select</tt> can be used to provide the query in a more compact (and flexible way). Using such an helper the above becomes: @iverbatim select("clusters:TPC/CLUSTERS/0") @endiverbatim Similarly the <tt>OutputSpec</tt> is a description of what the DataProcessor will produce, again in terms of (origin, description, subspecification) properties. The <tt>configParams</tt> vector would be used to specify which configuration options the data processing being described requires: @icode{cpp} struct ConfigParamSpec { std::string name; enum ParamType type; variant defaultValue; ... }; @endicode command line / configuration options would be automatically generated by it. These are available only at init stage, and can be used to configure services. They are not available to the actual <tt>process</tt> callback as all the critical parameters for data processing should be part of the data stream itself, eventually coming from CCDB / ParameterManager. Similarly the <tt>requiredServices</tt> vector would define which services are required for the data processing. For example this could be used to declare the need for some data cache, a GPU context, a thread pool. The <tt>algorithm</tt> property, of <tt>AlgorithmSpec</tt> is instead used to specify the actual computation. Notice that the same <tt>DataProcessorSpec</tt> can use different <tt>AlgorithmSpec</tt>. The rationale for this is that while inputs and outputs might be the same, you might want to compare different versions of your algorithm. The <tt>AlgorithmSpec</tt> resembles the following: @icode{cpp} struct AlgorithmSpec { using ProcessCallback = std::function<void(ProcessingContext &)>; using InitCallback = std::function<ProcessCallback(InitContext &)>; using ErrorCallback = std::function<void(ErrorContext &)>; InitCallback onInit = nullptr; ProcessCallback onProcess = nullptr; ErrorCallback onError = nullptr; ... }; @endicode The <tt>onProcess</tt> function is to be used for stateless computations. It’s a free function and it’s up to the framework to make sure that all the required components are declared upfront. It takes as input the context for the current computation in the form of a <tt>ProcessingContext \&</tt> instance. Such a context consist of: * An <tt>InputRecord</tt> which allows retrieving the current inputs matching the provided specification. * A <tt>ServiceRegistry</tt> referencing the set of services it declared as required the computation. * A <tt>DataAllocator</tt> allocator which can allocate new payloads only for the types which have been declared as <tt>outputs</tt>. <h4>Stateful processing</h4> <tt>onProcess</tt> is useful whenever your computation is fully contained in your input. In several cases, however, a computation requires some ancillary state, which needs to be initialised only on (re-)start of the job. For example you might want to initialise the geometry of your detector or open a file for reading. To do so, you can use the <tt>onInit</tt> callback, create the state in it and pass it to the returned <tt>ProcessCallback</tt> as captured arguments. E.g: @icode{cpp} AlgorithmSpec{ InitCallBack{[](InitContext &setup){ auto statefulGeo = std::make_shared<TGeo>(); return [geo = statefulGeo](ProcessingContext &) { // do something with geo }; } } } @endicode <h3>Task based API</h3> The previous API is flexible enough to work for large variety of cases, including creating <tt>onProcess</tt> callback on the fly depending on the <tt>onInit</tt> parameters. . However, very often what the user wants to do is to initialise some state and and invoke a "run" method. For this we provide a <tt>Task</tt> based API. In order to do so, you need to inherit your task from <tt>o2::framework::Task</tt> and use the adaptor: @icode{cpp} adaptFromTask<TASK>(task constructor arguments) -> AlgorithmSpec @endicode to create the <tt>AlgorithmSpec</tt>. A full example can be found in <tt>Framework/Core/test/test_Task.cpp</tt> <h2>Implementing a computation</h2> This chapter describes how to actually implement an <tt>AlgorithmSpec</tt>. <h3>Using inputs - the <tt>InputRecord</tt> API</h3> Inputs to your computation will be provided to you via the <tt>InputRecord</tt>. An instance of such a class is hanging from the <tt>ProcessingContext</tt> your computation lambda is passed and contains one value for each of the <tt>InputSpec</tt> you specified. E.g.: @icode{cpp} InputRecord &inputs = ctx.inputs(); @endicode From the <tt>InputRecord</tt> instance you can get the arguments either via their positional index: @icode{cpp} DataRef ref = inputs.getByPos(0); @endicode or using the mnemonics-label which was used as first argument in the associated <tt>InputSpec</tt>. @icode{cpp} DataRef ref = inputs.get("points"); @endicode You can then use the <tt>DataRef</tt> <tt>header</tt> and <tt>payload</tt> raw pointers to access the data in the messages. If the message is of a known messageable type, you can automatically get the content of the message by passing type T as template argument. The actual operation depends on the properties of the type. Not all types are supported, in order to get an object with pointer-like behavior, T has to be a pointer (T = U*). @icode{cpp} auto p = args.get<T>("input"); @endicode The return type is * <tt>T const\&</tt> if <tt>T</tt> is a messageable type * <tt>T</tt> if <tt>T</tt> is a <tt>std::container</tt> of a ROOT-serializable type * <tt>smart_pointer\<T\></tt> if <tt>T</tt> is a ROOT-serializable type and <tt>T*</tt> is passed Examples: * for messageable types there is no additional copy involved, the content is only for reading. @icode{cpp} XYZ const& p = args.get<XYZ>("points"); @endicode * ROOT-serialized objects are automatically deserialized and returned as a smart pointer. Note that the requested type has to be pointer. @icode{cpp} auto h = args.get<TH1*>("histo"); h->Print(); @endicode * container of ROOT-serialized objects are automatically deserialized and returned as container object. @icode{cpp} auto points = args.get<std::vector<TPoint>>("points"); for (auto& point : points) {} @endicode Check next section for known types. The framework will also take care of necessary deserialization. <h3>Creating outputs - the DataAllocator API</h3> In order to prevent algorithms to create data they are not supposed to create, a special <tt>DataAllocator</tt> object is passed to the process callback, so that only messages for declared outputs can be created. A <tt>DataAllocator</tt> can create Framework owned resources via the <tt>make\<T\></tt> method. In case you ask the framework to create a collection of objects, the result will be a <tt>gsl::span</tt> wrapper around the collection. A <tt>DataAllocator</tt> can adopt externally created resources via the <tt>adopt</tt> method. A <tt>DataAllocator</tt> can create a copy of an externally owned resource via the <tt>snapshot</tt> method. Currently supported data types for <tt>make\<T\></tt> are: * Vanilla <tt>char *</tt> buffers with associated size: this is the actual contents of the FairMQ message. * Messageable types: trivially copyable, non-polymorphic types. These get directly mapped on the message exchanged by FairMQ and are therefore "zerocopy" for what the Data Processing Layer is concerned. * Collections of messageable types, exposed to the user as <tt>gsl::span</tt>. * TObject derived classes. These are actually serialised via a TMessage and therefore are only suitable for the cases in which the cost of such a serialization is not an issue. Currently supported data types for <tt>snapshot</tt> functionality, state at time of calling <tt>snapshot</tt> is captured in a copy, and sent when processing is done: * Messageable types. * ROOT-serializable classes, serialised via a <tt>TMessage</tt>. Classes implementing ROOT's <tt>TClass</tt> interface and std containers of those are automatically detected. ROOT-serialization can be forced using type converter <tt>ROOTSerialized</tt>, e.g. for types which can not be detected automatically * <tt>std::vector</tt> of messageable type, at receiver side the collection is exposed as <tt>gsl::span</tt>. * <tt>std::vector</tt> of pointers to messageable type, the objects are linearized in th message and exposed as gsl::span on the receiver side. The <tt>DataChunk</tt> class resembles a <tt>iovec</tt>: @icode{cpp} struct DataChunk { char *data; size_t size; }; @endicode however, no API is provided to explicitly send it. All the created DataChunks are sent (potentially using scatter / gather) when the <tt>process</tt> function returns. This is to avoid the “modified after send” issues where a message which was sent is still owned and modifiable by the creator. <h3>Error handling</h3> When an error happens during processing of some data, the writer of the <tt>process</tt> function should simply throw an exception. By default the exception is caught by the <tt>DataProcessingDevice</tt> and a message is printed (if <tt>std::exeception</tt> derived <tt>what()</tt> method is used, otherwise a generic message is given). Users can provide themselves an error handler by specifying via the <tt>onError</tt> callback specified in <tt>DataProcessorSpec</tt>. This will allow in the future to reinject data into the flow in case of an error. When the exception is thrown inside processing function its message and stack trace is printed. However, the application itself is not terminated. If the error encountered is so severe that current workflow cannot continue it is advisable to call <tt>LOG(fatal)</tt> with proper describing message, which makes the application shutdown with non zero error code. <h3>Services</h3> Services are utility classes which <tt>DataProcessor</tt>s can access to request out-of-bound, deployment dependent, functionalities. For example a service could be used to post metrics to the monitoring system or to get a GPU context. The former would be dependent on whether you are running on your laptop (where monitoring could simply mean print out metrics on the command line) or in a large cluster (where monitoring probably means to send metrics to an aggregator device which then pushes them to the backend. Services are initialised by the driver code (i.e. the code included via <tt>runDataProcessing.h</tt>) and passed to the user code via a <tt>ServiceRegistry</tt>. You can retrieve the service by the type of its interface class. E.g. for Monitoring you can do: @icode{cpp} #include <Monitoring/Monitoring.h> // ... auto service = ctx.services().get<Monitoring>(); // In the DataProcessor lambda... service.send({ 1, "my/metric" }); ... @endicode Currently available services are described below. <h4>ControlService</h4> The control service allow DataProcessors to modify their state or the one of their peers in the topology. For example if you want to quit the whole data processing topology, you can use: @icode{cpp} #include "Framework/ControlService.h" //... auto ctx.services().get<ControlService>().readyToQuit(QuitRequest::All) // In the DataProcessor lambda @endicode <h4>RawDeviceService</h4> This service allows you to get an hold of the <tt>fair::mq::Device</tt> running the DataProcessor computation from with the computation itself. While in general this should not be used, it is handy in case you want to integrate with a pre-existing <tt>fair::mq::Device</tt> which potentially does not even follow the O2 Data Model. <h4>Monitoring service</h4> Integration with the monitoring subsystem of O2 happens by getting the <tt>o2::monitoring::Monitoring</tt> interface. A simple example is: @icode{cpp} #include <Monitoring/Monitoring.h> // ... auto service = ctx.services().get<Monitoring>(); // In the DataProcessor lambda... service.send({ 1, "my/metric" }); ... @endicode for the full API documentation please have a look at: https://github.com/AliceO2Group/Monitoring Some suffix for the metrics are reserved to represent vector and tabular metrics. * <tt>\<some-metric-name\>/n</tt> contains the size of a vector metric at a given moment. * <tt>\<some-metric-name\>/m</tt> contains the secondary size of a matrix metric at a given moment. * <tt>\<some-metric-name\>/\<i\></tt> where <tt>\<i\></tt> is an integer contains the values of the i-th element in a vector metric or of the <tt>\<i\>%n</tt> column, <tt>\<i\>/m</tt> row of a matrix metric. <h4>Generic Logger</h4> Generic logging capabilities of DPL are provided via Framework/Logger which wraps and extents FairLogger. @icode{C} ++ #include "Framework/Logger.h" ... LOG(info) << "some message"; // streamer based API LOGF(info, "s", "some message"); // printf based API LOGP(info, "{}", "some message"); // python / fmt based API O2INFO("{}", "some message); // same but with less typing.
Integration with the InfoLogger subsystem of O2 happens in two way:
AliceO2::InfoLogger::InfoLogger
service from the registry, like done in the monitoring case:LOG
macro. In order to enable this the user needs to specify the minimum severity to send to the InfoLogger
via the --infologger-severity
option, e.g.:Finally, one can configure the bahavior of the InfoLogger by using the --infologger-mode
option.
Notice also that you can actually customise the InfoLoggerContext
which DPL uses to match your needs. This can be done getting it from the InitContext:
A service that data processors can register callback functions invoked by the framework at defined steps in the process flow. This allows you to have customisation points for the following event:
CallbackService::Id::Start
: before entering the running state.CallbackService::Id::Stop
: before exiting the running state.CallbackService::Id::Reset
: before resetting the device.Moreover in case you want to process events which are not coming from FairMQ
, there is a CallbackService::Id::ClockTick
which is called according to the rate specified for the backing FairMQ device.
Similarly the CallbackService::Id::Idle
callback is fired whenever there was nothing to process.
One last callback is CallbackService::Id::EndOfStream
. This callback will be invoked whenever all the upstream DataProcessingDevice consider that they will not produce any more data, so we can finalize our results and exit.
If we want to retain a message passing semantic and really treat shared memory as yet another transport, we need to be very careful in how to express parallelism on data, so that the “single ownership model” of message passing forces us to either duplicate streams that need to be accessed in parallel, or to serialise workers which need to access the same data. Solutions like reference counting shared memory would not be allowed in such a scenario and in any case would require extra caution and support to make sure that failures do not leave dangling reference around (e.g. when one of the parallel workers abruptly terminates). First of all let’s consider the fact that there are two level of parallelisms which can be achieved:
In order to express those DPL provides the o2::framework::parallel
and o2::framework::timePipeline
helpers to avoid expressing those explicitly in the workflow.
It can actually happen that you need to interface with native FairMQ devices, either for convenience or because they require a custom behavior which does not map well on top of the Data Processing Layer.
This is fully supported and the DPL provides means to ingest foreign, non-DPL fair::mq::Device
produced messages into a DPL workflow. This is done via the help of a "proxy" data processor which connects to the foreign device, receives its inputs, optionally converts them to a format understood by the Data Processing Layer, and then pumps them to the right Data Processor Specs.
This is done using the FairMQRawDeviceService
which exposes the actual device on which an Algorithm is running, giving the user full control.
The DPL aims at solving with the default behavior the majority of common usecases. For all the special cases to be handle, we however provide multiple customisation entrypoints, in particular to:
CompletionPolicy
.ChannelConfigurationPolicy
.In all cases this is done by providing a:
free function which takes as argument the group of policies to be applied to customise the behavior.
In general a DPL workflow consists of a C++ executable which defines an implicit workflow, as previously discribed. However it is sometimes handy to be able to split workflow in parts, e.g. to be able to run two detectors independently or to have a basic workflow with is then decorated with extra processing like data sampling if / when requested.
DPL allows merging workflows by simply piping one into the other. So if workflow-a
and workflow-b
are two separate workflows, one can run the union of the two by doing:
Because the merging happens at the level of the implicit representation, this allows having dangling inputs and outputs which are potentially satisfied only when a separate workflow is merged.
While not part of the initial design goal, we plan to extend DPL in order to support analysis. In particular we are evaluating a mode in which users can natively get a ROOT RDataFrame
with an API similar to the InputRecord
API.
By default the Input and Outputs are matched solely by the signature of the data they contain. However sometimes it's desirable that this matching happens based on the history that a given message had, e.g. if it went through one path or another of a dataflow bifurcation. While this is not at the moment supported and you would have to use a separate data type for the two different origins, the usecase is acknowledged and will be addressed in a future revision of this document.
At the moment the configuration of a topology is done by compiling a declarative description of the computation done in C++, and by running the resulting executable. This is however an implementation detail and the datamodel for the configuration description is meant to be agnostic from the language. We foresee both using interpreted configurations (e.g. via ROOT / cling) or configurations done through another language (e.g. go, javascript).
DPL related classes are fully contained in the Framework
folder of O2.
In particular:
Framework/Core
contains the main classes associated to DPL and the implementation of the DataProcessingDevice and the Driver.Framework/Core/test
contains a few unit test and simple example workflows.Framework/TestWorkflows
contains a few example workflows.Framework/GUISupport
contains the core GUI functionalities.Framework/AnalysisSupport
contains some Analysis Framework specific components.Framework/Foundation
contains some header only utility classes, in particular for what concerns cross platform compatibility.Framework/Utils
contains utilities and helpers for the creation of workflows and in particular to interface with the rest of non-DPL utilities.A class ending in Spec
represents a Specification, i.e. a description how a given entity behaves. E.g. InputSpec is a specification for one of the Inputs.
A class ending in Info
represents runtime Information regarding an entity. E.g. DeviceInfo
represent runtime information about a Device.
A class ending in Context
holds the state of a given phase. E.g. InitContext
holds the state in which the Init callback happens.