24#include <Configuration/ConfigurationInterface.h>
25#include <Configuration/ConfigurationFactory.h>
35std::string DataSampling::createDispatcherName()
37 return std::string(
"Dispatcher");
42 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(policiesSource);
43 if (cfg->getRecursive(
"").count(
"dataSamplingPolicies") == 0) {
44 LOG(warn) <<
"No \"dataSamplingPolicies\" structure found in the config file. If no Data Sampling is expected, then it is completely fine.";
47 auto policiesTree = cfg->getRecursive(
"dataSamplingPolicies");
48 Dispatcher dispatcher(createDispatcherName(), policiesSource);
49 DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
54 Dispatcher dispatcher(createDispatcherName(),
"");
55 DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
58void DataSampling::DoGenerateInfrastructure(
Dispatcher& dispatcher,
WorkflowSpec& workflow,
const boost::property_tree::ptree& policiesTree,
size_t threads,
const std::string& host)
60 LOG(
debug) <<
"Generating Data Sampling infrastructure...";
61 std::set<std::string>
ids;
63 for (
auto&& policyConfig : policiesTree) {
68 if (!policy.isActive()) {
69 LOG(
debug) <<
"The data sampling policy '" << policy.getName() <<
"' is inactive, skipping...";
72 if (
ids.count(policy.getName()) == 1) {
73 LOG(error) <<
"A policy with the same id has already been encountered (" + policy.getName() +
")";
75 ids.insert(policy.getName());
76 std::vector<std::string> machines;
77 if (policyConfig.second.count(
"machines") > 0) {
78 for (
const auto& machine : policyConfig.second.get_child(
"machines")) {
79 machines.emplace_back(machine.second.get<std::string>(
""));
82 if (host.empty() || machines.empty() || std::find(machines.begin(), machines.end(), host) != machines.end()) {
83 dispatcher.
registerPolicy(std::make_unique<DataSamplingPolicy>(std::move(policy)));
85 }
catch (
const std::exception& ex) {
86 LOG(warn) <<
"Could not load the Data Sampling Policy '"
87 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"', because: " << ex.what();
90 LOG(warn) <<
"Could not load the Data Sampling Policy '"
91 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"'";
102 spec.
labels = {{
"DataSampling"}, {
"Dispatcher"}};
104 spec.
algorithm = adaptFromTask<Dispatcher>(std::move(dispatcher));
106 workflow.emplace_back(std::move(spec));
108 LOG(
debug) <<
"No input to this dispatcher, it won't be added to the workflow.";
115 policies.push_back(dispatcherConsumesASAP);
126 std::vector<InputSpec> inputs;
127 for (
auto&& policyConfig : policiesTree) {
128 if (policyConfig.second.get<std::string>(
"id") == policyName) {
130 for (
const auto&
path : policy.getPathMap()) {
132 inputs.push_back(input);
142 std::vector<OutputSpec> outputs;
143 for (
auto&& policyConfig : policiesTree) {
144 if (policyConfig.second.get<std::string>(
"id") == policyName) {
146 for (
const auto&
path : policy.getPathMap()) {
147 outputs.push_back(
path.second);
157 for (
auto&& policyConfig : policiesTree) {
158 if (policyConfig.second.get<std::string>(
"id") == policyName) {
159 auto boostOptionalPort = policyConfig.second.get_optional<uint16_t>(
"port");
160 return boostOptionalPort.has_value() ? std::optional<uint16_t>(boostOptionalPort.value()) : std::nullopt;
163 throw std::runtime_error(
"Could not find the policy '" + policyName +
"'");
168 std::vector<std::string> machines;
169 for (
auto&& policyConfig : policiesTree) {
170 if (policyConfig.second.get<std::string>(
"id") == policyName) {
171 if (policyConfig.second.count(
"machines") > 0) {
172 for (
const auto& machine : policyConfig.second.get_child(
"machines")) {
173 machines.emplace_back(machine.second.get<std::string>(
""));
179 throw std::runtime_error(
"Could not find the policy '" + policyName +
"'");
184 for (
auto&& policyConfig : policiesTree) {
185 if (policyConfig.second.get<std::string>(
"id") == policyName) {
186 return policyConfig.second.get_optional<std::string>(
"bindLocation").value_or(
"remote");
189 throw std::runtime_error(
"Could not find the policy '" + policyName +
"'");
A declaration of O2 Data Sampling Policy.
Definition of O2 Data Sampling, v1.0.
Declaration of Dispatcher for O2 Data Sampling.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static std::string BindLocationForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Says if remote part (e.g. QC server) should bind the inter-machine channel, according to the configur...
static std::optional< uint16_t > PortForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides the port to be used for a proxy of given DataSamplingPolicy. Expects the "dataSamplingPolici...
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
static std::vector< framework::InputSpec > InputSpecsForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides InputSpecs to receive data for given DataSamplingPolicy. Expects the "dataSamplingPolicies" ...
static std::vector< std::string > MachinesForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides the machines where given DataSamplingPolicy is enabled. Expects the "dataSamplingPolicies" t...
static std::vector< framework::OutputSpec > OutputSpecsForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides OutputSpecs of given DataSamplingPolicy. Expects the "dataSamplingPolicies" tree.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
framework::Options getOptions()
const std::string & getName()
GLsizei const GLchar *const * path
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
A header which contains some meta-data generated by Data Sampling.
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
size_t maxInputTimeslices
std::vector< DataProcessorLabel > labels
static InputSpec matchingInput(OutputSpec const &spec)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"