Project
Loading...
Searching...
No Matches
DataSampling.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/Logger.h"
23
24#include <Configuration/ConfigurationInterface.h>
25#include <Configuration/ConfigurationFactory.h>
26#include <set>
27
28using namespace o2::configuration;
29using namespace o2::framework;
31
32namespace o2::utilities
33{
34
35std::string DataSampling::createDispatcherName()
36{
37 return std::string("Dispatcher"); //_") + getenv("HOSTNAME");
38}
39
40void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads, const std::string& host)
41{
42 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(policiesSource);
43 if (cfg->getRecursive("").count("dataSamplingPolicies") == 0) {
44 LOG(warn) << "No \"dataSamplingPolicies\" structure found in the config file. If no Data Sampling is expected, then it is completely fine.";
45 return;
46 }
47 auto policiesTree = cfg->getRecursive("dataSamplingPolicies");
48 Dispatcher dispatcher(createDispatcherName(), policiesSource);
49 DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
50}
51
52void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
53{
54 Dispatcher dispatcher(createDispatcherName(), "");
55 DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
56}
57
58void DataSampling::DoGenerateInfrastructure(Dispatcher& dispatcher, WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
59{
60 LOG(debug) << "Generating Data Sampling infrastructure...";
61 std::set<std::string> ids; // keep track of the ids we have met so far
62
63 for (auto&& policyConfig : policiesTree) {
64
65 // We don't want the Dispatcher to exit due to one faulty Policy
66 try {
67 auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
68 if (!policy.isActive()) {
69 LOG(debug) << "The data sampling policy '" << policy.getName() << "' is inactive, skipping...";
70 continue;
71 }
72 if (ids.count(policy.getName()) == 1) {
73 LOG(error) << "A policy with the same id has already been encountered (" + policy.getName() + ")";
74 }
75 ids.insert(policy.getName());
76 std::vector<std::string> machines;
77 if (policyConfig.second.count("machines") > 0) {
78 for (const auto& machine : policyConfig.second.get_child("machines")) {
79 machines.emplace_back(machine.second.get<std::string>(""));
80 }
81 }
82 if (host.empty() || machines.empty() || std::find(machines.begin(), machines.end(), host) != machines.end()) {
83 dispatcher.registerPolicy(std::make_unique<DataSamplingPolicy>(std::move(policy)));
84 }
85 } catch (const std::exception& ex) {
86 LOG(warn) << "Could not load the Data Sampling Policy '"
87 << policyConfig.second.get_optional<std::string>("id").value_or("") << "', because: " << ex.what();
88 continue;
89 } catch (...) {
90 LOG(warn) << "Could not load the Data Sampling Policy '"
91 << policyConfig.second.get_optional<std::string>("id").value_or("") << "'";
92 continue;
93 }
94 }
95
96 if (dispatcher.numberOfPolicies() > 0) {
98 spec.name = dispatcher.getName();
99 spec.inputs = dispatcher.getInputSpecs();
100 spec.outputs = dispatcher.getOutputSpecs();
101 spec.maxInputTimeslices = threads;
102 spec.labels = {{"DataSampling"}, {"Dispatcher"}};
103 spec.options = dispatcher.getOptions();
104 spec.algorithm = adaptFromTask<Dispatcher>(std::move(dispatcher));
105
106 workflow.emplace_back(std::move(spec));
107 } else {
108 LOG(debug) << "No input to this dispatcher, it won't be added to the workflow.";
109 }
110}
111
112void DataSampling::CustomizeInfrastructure(std::vector<CompletionPolicy>& policies)
113{
114 CompletionPolicy dispatcherConsumesASAP = CompletionPolicyHelpers::defineByName(createDispatcherName(), CompletionPolicy::CompletionOp::Consume);
115 policies.push_back(dispatcherConsumesASAP);
116}
117
118void DataSampling::CustomizeInfrastructure(std::vector<ChannelConfigurationPolicy>& policies)
119{
120 // todo: add push-pull for channels that require blocking
121 // now it cannot be done, since matching is possible only using data processors names
122}
123
124std::vector<framework::InputSpec> DataSampling::InputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
125{
126 std::vector<InputSpec> inputs;
127 for (auto&& policyConfig : policiesTree) {
128 if (policyConfig.second.get<std::string>("id") == policyName) {
129 auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
130 for (const auto& path : policy.getPathMap()) {
132 inputs.push_back(input);
133 }
134 break;
135 }
136 }
137 return inputs;
138}
139
140std::vector<framework::OutputSpec> DataSampling::OutputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
141{
142 std::vector<OutputSpec> outputs;
143 for (auto&& policyConfig : policiesTree) {
144 if (policyConfig.second.get<std::string>("id") == policyName) {
145 auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
146 for (const auto& path : policy.getPathMap()) {
147 outputs.push_back(path.second);
148 }
149 break;
150 }
151 }
152 return outputs;
153}
154
155std::optional<uint16_t> DataSampling::PortForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
156{
157 for (auto&& policyConfig : policiesTree) {
158 if (policyConfig.second.get<std::string>("id") == policyName) {
159 auto boostOptionalPort = policyConfig.second.get_optional<uint16_t>("port");
160 return boostOptionalPort.has_value() ? std::optional<uint16_t>(boostOptionalPort.value()) : std::nullopt;
161 }
162 }
163 throw std::runtime_error("Could not find the policy '" + policyName + "'");
164}
165
166std::vector<std::string> DataSampling::MachinesForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
167{
168 std::vector<std::string> machines;
169 for (auto&& policyConfig : policiesTree) {
170 if (policyConfig.second.get<std::string>("id") == policyName) {
171 if (policyConfig.second.count("machines") > 0) {
172 for (const auto& machine : policyConfig.second.get_child("machines")) {
173 machines.emplace_back(machine.second.get<std::string>(""));
174 }
175 }
176 return machines;
177 }
178 }
179 throw std::runtime_error("Could not find the policy '" + policyName + "'");
180}
181
182std::string DataSampling::BindLocationForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
183{
184 for (auto&& policyConfig : policiesTree) {
185 if (policyConfig.second.get<std::string>("id") == policyName) {
186 return policyConfig.second.get_optional<std::string>("bindLocation").value_or("remote");
187 }
188 }
189 throw std::runtime_error("Could not find the policy '" + policyName + "'");
190}
191
192} // namespace o2::utilities
A declaration of O2 Data Sampling Policy.
Definition of O2 Data Sampling, v1.0.
Declaration of Dispatcher for O2 Data Sampling.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
std::ostringstream debug
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static std::string BindLocationForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Says if remote part (e.g. QC server) should bind the inter-machine channel, according to the configur...
static std::optional< uint16_t > PortForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides the port to be used for a proxy of given DataSamplingPolicy. Expects the "dataSamplingPolici...
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
static std::vector< framework::InputSpec > InputSpecsForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides InputSpecs to receive data for given DataSamplingPolicy. Expects the "dataSamplingPolicies" ...
static std::vector< std::string > MachinesForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides the machines where given DataSamplingPolicy is enabled. Expects the "dataSamplingPolicies" t...
static std::vector< framework::OutputSpec > OutputSpecsForPolicy(const boost::property_tree::ptree &policiesTree, const std::string &policyName)
Provides OutputSpecs of given DataSamplingPolicy. Expects the "dataSamplingPolicies" tree.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
framework::Options getOptions()
const std::string & getName()
GLuint * ids
Definition glcorearb.h:647
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
A header which contains some meta-data generated by Data Sampling.
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
std::vector< DataProcessorLabel > labels
static InputSpec matchingInput(OutputSpec const &spec)
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"