21void customize(std::vector<CompletionPolicy>& policies)
27void customize(std::vector<ChannelConfigurationPolicy>& policies)
33void customize(std::vector<ConfigParamSpec>& workflowOptions)
35 workflowOptions.push_back(
ConfigParamSpec{
"sampling-fraction", VariantType::Double, 1.0, {
"sampling fraction"}});
36 workflowOptions.push_back(
ConfigParamSpec{
"payload-size", VariantType::Int, 10000, {
"payload size"}});
37 workflowOptions.push_back(
ConfigParamSpec{
"producers", VariantType::Int, 1, {
"number of producers"}});
38 workflowOptions.push_back(
ConfigParamSpec{
"dispatchers", VariantType::Int, 1, {
"number of dispatchers"}});
39 workflowOptions.push_back(
ConfigParamSpec{
"usleep", VariantType::Int, 0, {
"usleep time of producers"}});
41 "fill", VariantType::Bool,
false, {
"should fill the messages (prevents memory overcommitting)"}});
45#include <boost/algorithm/string.hpp>
46#include <boost/interprocess/managed_shared_memory.hpp>
47#include <boost/functional/hash.hpp>
48#include <boost/property_tree/ptree.hpp>
49#include <fairmq/Device.h>
60using namespace boost::property_tree;
66 double samplingFraction = config.
options().
get<
double>(
"sampling-fraction");
67 size_t payloadSize = config.
options().
get<
int>(
"payload-size");
68 size_t producers = config.
options().
get<
int>(
"producers");
69 size_t dispatchers = config.
options().
get<
int>(
"dispatchers");
70 size_t usleepTime = config.
options().
get<
int>(
"usleep");
71 bool fill = config.
options().
get<
bool>(
"fill");
74 policy.put(
"id",
"benchmark");
75 policy.put(
"active",
"true");
76 policy.put(
"query",
"TST:TST/RAWDATA");
77 ptree samplingConditions;
78 ptree conditionRandom;
79 conditionRandom.put(
"condition",
"random");
81 conditionRandom.put(
"seed",
"22222");
82 samplingConditions.push_back(std::make_pair(
"", conditionRandom));
83 policy.add_child(
"samplingConditions", samplingConditions);
84 policy.put(
"blocking",
"false");
86 policies.push_back(std::make_pair(
"", policy));
90 for (
size_t p = 0; p < producers; p++) {
101 auto data = pctx.outputs().make<
char>(
Output{
"TST",
"RAWDATA",
static_cast<SubSpec>(p) }, payloadSize);
103 memset(
data.data(), 0x00, payloadSize);
120 specs.push_back(podDataSink);
A declaration of O2 Data Sampling Policy.
Definition of O2 Data Sampling, v1.0.
ConfigParamRegistry & options() const
T get(const char *key) const
static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id)
static header::DataOrigin createPolicyDataOrigin()
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
o2::header::DataHeader::SubSpecificationType SubSpec
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::string to_string(gsl::span< T, Size > span)
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)