Project
Loading...
Searching...
No Matches
DataSamplingPolicy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/Logger.h"
23
24#include <boost/property_tree/ptree.hpp>
25
26using namespace o2::framework;
27
28namespace o2::utilities
29{
30
31using boost::property_tree::ptree;
32
34{
35}
36
37void DataSamplingPolicy::registerPath(const InputSpec& inputSpec, const OutputSpec& outputSpec)
38{
39 mPaths.emplace_back(inputSpec, outputSpec);
40}
41
42void DataSamplingPolicy::registerCondition(std::unique_ptr<DataSamplingCondition>&& condition)
43{
44 mConditions.emplace_back(std::move(condition));
45}
46
48{
49 mFairMQOutputChannel = std::move(channel);
50}
51
53{
54 auto name = config.get<std::string>("id");
55
57 policy.mActive = config.get<bool>("active", "true");
58
59 size_t outputId = 0;
60 std::vector<InputSpec> inputSpecs = DataDescriptorQueryBuilder::parse(config.get<std::string>("query").c_str());
61 std::vector<OutputSpec> outputSpecs;
62 // Optionally user can specify the outputs,
63 if (auto outputsQuery = config.get<std::string>("outputs", ""); !outputsQuery.empty()) {
64 std::vector<InputSpec> outputsAsInputSpecs = DataDescriptorQueryBuilder::parse(outputsQuery.c_str());
65 if (outputsAsInputSpecs.size() != inputSpecs.size()) {
66 throw std::runtime_error(
67 "The number of outputs should match the number of inputs (queries),"
68 " which is not the case for the policy '" +
69 name + "'(" +
70 std::to_string(inputSpecs.size()) + " inputs vs. " + std::to_string(outputsAsInputSpecs.size()) + " outputs).");
71 }
72 for (const auto& outputAsInputSpec : outputsAsInputSpecs) {
73 outputSpecs.emplace_back(DataSpecUtils::asOutputSpec(outputAsInputSpec));
74 outputSpecs.back().lifetime = Lifetime::QA;
75 }
76 } else { // otherwise default format will be used
77 for (const auto& inputSpec : inputSpecs) {
78 if (DataSpecUtils::getOptionalSubSpec(inputSpec).has_value()) {
79 outputSpecs.emplace_back(OutputSpec{
80 {inputSpec.binding},
83 DataSpecUtils::getOptionalSubSpec(inputSpec).value(),
84 Lifetime::QA});
85 } else {
86 outputSpecs.emplace_back(OutputSpec{
87 {inputSpec.binding},
89 Lifetime::QA});
90 }
91 }
92 }
93 assert(inputSpecs.size() == outputSpecs.size());
94 for (size_t i = 0; i < inputSpecs.size(); i++) {
95 policy.registerPath(inputSpecs[i], outputSpecs[i]);
96 }
97
98 for (const auto& conditionConfig : config.get_child("samplingConditions")) {
99 auto condition = DataSamplingConditionFactory::create(conditionConfig.second.get<std::string>("condition"));
100 condition->configure(conditionConfig.second);
101 policy.registerCondition(std::move(condition));
102 }
103
104 policy.setFairMQOutputChannel(config.get_optional<std::string>("fairMQOutput").value_or(""));
105
106 return policy;
107}
108
110{
111 const auto it = mPaths.find(input);
112 return it != mPaths.end() ? &(it->second) : nullptr;
113}
114
116{
117 bool decision = std::all_of(mConditions.begin(), mConditions.end(),
118 [dataRef](std::unique_ptr<DataSamplingCondition>& condition) {
119 return condition->decide(dataRef);
120 });
121
122 mTotalAcceptedMessages += decision;
123 mTotalEvaluatedMessages++;
124
125 return decision;
126}
127
129{
130 auto result = mPaths.find(input);
131 if (result != mPaths.end()) {
132 auto dataType = DataSpecUtils::asConcreteDataTypeMatcher(result->second);
133 return Output{dataType.origin, dataType.description, input.subSpec};
134 } else {
136 }
137}
138
139const std::string& DataSamplingPolicy::getName() const
140{
141 return mName;
142}
143
144const DataSamplingPolicy::PathMap& DataSamplingPolicy::getPathMap() const
145{
146 return mPaths;
147}
148
150{
151 return mFairMQOutputChannel;
152}
153
155{
156 size_t nameBegin = mFairMQOutputChannel.find("name=") + sizeof("name=") - 1;
157 size_t nameEnd = mFairMQOutputChannel.find_first_of(',', nameBegin);
158 std::string name = mFairMQOutputChannel.substr(nameBegin, nameEnd - nameBegin);
159 return name;
160}
161
163{
164 return mTotalAcceptedMessages;
165}
167{
168 return mTotalEvaluatedMessages;
169}
170
172{
173 return mActive;
174}
175
180
182{
183 if (id > 99) {
184 throw std::runtime_error("Maximum 100 inputs in DataSamplingPolicy are supported. Call the developers if you really need more.");
185 }
186
187 if (policyName.size() > 14) {
188 LOG(warning) << "DataSamplingPolicy name '" << policyName << "' is longer than 14 characters, we have to trim it. "
189 << "Use a shorter policy name to avoid potential output name conflicts.";
190 policyName.resize(14);
191 }
192
193 header::DataDescription outputDescription;
194 outputDescription.runtimeInit((policyName + std::to_string(id)).c_str());
195 return outputDescription;
196}
197
198} // namespace o2::utilities
A definition of DataSamplingConditionFactory.
A declaration of O2 Data Sampling Header.
A declaration of O2 Data Sampling Policy.
int32_t i
static std::unique_ptr< DataSamplingCondition > create(std::string name)
Creates instance of DataSamplingCondition child, given the name.
void setFairMQOutputChannel(std::string)
Sets a raw fair::mq::Channel. Deprecated, do not use.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id)
void registerPath(const framework::InputSpec &, const framework::OutputSpec &)
Adds a new association between inputs and outputs.
DataSamplingPolicy(std::string name)
Constructor.
void registerCondition(std::unique_ptr< DataSamplingCondition > &&)
Adds a new association between inputs and outputs.
const framework::OutputSpec * match(const framework::ConcreteDataMatcher &input) const
Returns true if this policy requires data with given InputSpec.
static header::DataOrigin createPolicyDataOrigin()
const std::string & getFairMQOutputChannel() const
bool decide(const o2::framework::DataRef &)
Returns true if user-defined conditions of sampling are fulfilled.
const std::string & getName() const
framework::Output prepareOutput(const framework::ConcreteDataMatcher &input, framework::Lifetime lifetime=framework::Lifetime::Timeframe) const
Returns Output for given InputSpec to pass data forward.
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
constexpr o2::header::DataDescription gDataDescriptionInvalid
Definition DataHeader.h:596
constexpr o2::header::DataOrigin gDataOriginInvalid
Definition DataHeader.h:561
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
A header which contains some meta-data generated by Data Sampling.
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
header::DataHeader::SubSpecificationType subSpec
static std::vector< InputSpec > parse(const char *s="")
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static OutputSpec asOutputSpec(InputSpec const &spec)
header::DataOrigin origin
Definition Output.h:28
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"