Project
Loading...
Searching...
No Matches
dataSamplingBenchmark.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15#include <vector>
16#include <filesystem>
17
18using namespace o2::framework;
19using namespace o2::utilities;
20
21void customize(std::vector<CompletionPolicy>& policies)
22{
24 policies.push_back(CompletionPolicyHelpers::defineByName("dataSink", CompletionPolicy::CompletionOp::Consume));
25}
26
27void customize(std::vector<ChannelConfigurationPolicy>& policies)
28{
30}
31
32// we need to add workflow options before including Framework/runDataProcessing
33void customize(std::vector<ConfigParamSpec>& workflowOptions)
34{
35 workflowOptions.push_back(ConfigParamSpec{"sampling-fraction", VariantType::Double, 1.0, {"sampling fraction"}});
36 workflowOptions.push_back(ConfigParamSpec{"payload-size", VariantType::Int, 10000, {"payload size"}});
37 workflowOptions.push_back(ConfigParamSpec{"producers", VariantType::Int, 1, {"number of producers"}});
38 workflowOptions.push_back(ConfigParamSpec{"dispatchers", VariantType::Int, 1, {"number of dispatchers"}});
39 workflowOptions.push_back(ConfigParamSpec{"usleep", VariantType::Int, 0, {"usleep time of producers"}});
40 workflowOptions.push_back(ConfigParamSpec{
41 "fill", VariantType::Bool, false, {"should fill the messages (prevents memory overcommitting)"}});
42}
43
44#include <memory>
45#include <boost/algorithm/string.hpp>
46#include <boost/interprocess/managed_shared_memory.hpp>
47#include <boost/functional/hash.hpp>
48#include <boost/property_tree/ptree.hpp>
49#include <fairmq/Device.h>
50#include <iostream>
51#include "Headers/DataHeader.h"
57
58using namespace o2::framework;
59using namespace o2::utilities;
60using namespace boost::property_tree;
62
63// clang-format off
65{
66 double samplingFraction = config.options().get<double>("sampling-fraction");
67 size_t payloadSize = config.options().get<int>("payload-size");
68 size_t producers = config.options().get<int>("producers");
69 size_t dispatchers = config.options().get<int>("dispatchers");
70 size_t usleepTime = config.options().get<int>("usleep");
71 bool fill = config.options().get<bool>("fill");
72
73 ptree policy;
74 policy.put("id", "benchmark");
75 policy.put("active", "true");
76 policy.put("query", "TST:TST/RAWDATA");
77 ptree samplingConditions;
78 ptree conditionRandom;
79 conditionRandom.put("condition", "random");
80 conditionRandom.put("fraction", std::to_string(samplingFraction));
81 conditionRandom.put("seed", "22222");
82 samplingConditions.push_back(std::make_pair("", conditionRandom));
83 policy.add_child("samplingConditions", samplingConditions);
84 policy.put("blocking", "false");
85 ptree policies;
86 policies.push_back(std::make_pair("", policy));
87
88 WorkflowSpec specs;
89
90 for (size_t p = 0; p < producers; p++) {
91 specs.push_back(DataProcessorSpec{
92 "dataProducer" + std::to_string(p),
93 Inputs{},
94 Outputs{
95 OutputSpec{ "TST", "RAWDATA", static_cast<SubSpec>(p) }
96 },
99 return (AlgorithmSpec::ProcessCallback) [=](ProcessingContext& pctx) mutable {
100 usleep(usleepTime);
101 auto data = pctx.outputs().make<char>(Output{ "TST", "RAWDATA", static_cast<SubSpec>(p) }, payloadSize);
102 if (fill) {
103 memset(data.data(), 0x00, payloadSize);
104 }
105 };
106 }
107 }
108 });
109 }
110
111 DataSampling::GenerateInfrastructure(specs, policies, dispatchers);
112
113 DataProcessorSpec podDataSink{
114 "dataSink",
116 Outputs{},
119
120 specs.push_back(podDataSink);
121 return specs;
122}
123// clang-format on
A declaration of O2 Data Sampling Policy.
Definition of O2 Data Sampling, v1.0.
ConfigParamRegistry & options() const
static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id)
static header::DataOrigin createPolicyDataOrigin()
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
o2::header::DataHeader::SubSpecificationType SubSpec
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
uint32_t SubSpecificationType
Definition DataHeader.h:620