Project
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
Data Sampling

Data Sampling

Data Sampling provides a possibility to sample data in DPL workflows based on certain conditions ( 5% randomly, when a payload is greater than 4234 bytes, etc.). The job of passing the right data is done by a data processor called Dispatcher. A desired data stream is specified in form of Data Sampling Policies, configured by JSON structures (example below) or by using dedicated interface methods (for advanced use).

{
"id": "policy_example1", # name of the policy
"active": "false", # activation flag
"machines": [ # list of machines where the policy should be run (now ignored)
"aido2flp1",
"aido2flp2"
], # list of data that should be sampled, the format is:
# binding1:origin1/description1/subSpec1[;binding2:...]
"query": "clusters:TPC/CLUSTERS/0;tracks:TPC/TRACKS/0",
# optional list of outputspecs for sampled data, matching the query
# if not present or specified, the default format is used
"outputs": "sampled_clusters:DS/CLUSTERS/0;sampled_tracks:DS/TRACKS/0",
"samplingConditions": [ # list of sampling conditions
{
"condition": "random", # condition type
"fraction": "0.1", # condition-dependent parameter: fraction of data to sample
"seed": "2112" # condition-dependent parameter: seed of PRNG
}
],
"blocking": "false" # should the dispatcher block the main data flow? (now ignored)
}

Usage

One can use Data Sampling either by merging the standalone Data Sampling workflow with other DPL workflows:

o2-workflow-abc | o2-datasampling-standalone --config json://path/to/config.json | o2-workflow-xyz

...or by incorporating the code below into a DPL workflow which needs sampling:

using namespace o2::framework;
using namespace o2::utilities;
void customize(std::vector<CompletionPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
}
void customize(std::vector<ChannelConfigurationPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
}
std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext &ctx)
{
WorkflowSpec workflow;
// <declaration of other DPL processors>
DataSampling::GenerateInfrastructure(workflow, "json:///absolute/path/to/config/file.json");
return workflow;
}
Definition of O2 Data Sampling, v1.0.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
A header which contains some meta-data generated by Data Sampling.
o2::framework::WorkflowSpec WorkflowSpec

Sampled data can be subscribed to by adding InputSpecs provided by std::vector<InputSpec> DataSampling::InputSpecsForPolicy(const std::string& policiesSource, const std::string& policyName) to a chosen data processor. Then, they can be accessed by the bindings specified in the configuration file. Dispatcher adds a DataSamplingHeader to the header stack, which contains statistics like total number of evaluated/accepted messages for a given Policy or the sampling time since epoch. If no sampling policies are specified, Dispatcher will not be spawned.

The o2-datasampling-pod-and-root workflow can serve as a usage example.

Data Sampling Conditions

The following sampling conditions are available. When more than one is used, a positive decision is taken when all the conditions are fulfilled.

  • DataSamplingConditionRandom - pseudo-randomly accepts specified fraction of incoming messages. Use seed "0" to have it randomly selected. The "timesliceId" parameter selects the header value that is used to select the message, the available options are "startTime" (default), "tfCounter" and "firstTForbit".
    {
    "condition": "random",
    "fraction": "0.1",
    "seed": "22222",
    "timesliceId": "startTime"
    }
  • DataSamplingConditionNConsecutive - approves n consecutive samples in defined cycle. It assumes that timesliceID always increments by one.
    {
    "condition": "nConsecutive",
    "samplesNumber": "3",
    "cycleSize": "100"
    }
  • DataSamplingConditionPayloadSize - approves messages having payload size within specified boundaries.
    {
    "condition": "payloadSize",
    "lowerLimit": "300",
    "upperLimit": "500"
    }
  • DataSamplingConditionCustom - loads a custom condition, which should inherit from DataSamplingCondition, from a specified library.
    {
    "condition": "custom",
    "moduleName": "QcExample",
    "className": "o2::quality_control_modules::example::ExampleCondition",
    "customParam": "value"
    }