24#include <boost/property_tree/ptree.hpp>
31using boost::property_tree::ptree;
39 mPaths.emplace_back(inputSpec, outputSpec);
44 mConditions.emplace_back(std::move(condition));
49 mFairMQOutputChannel = std::move(channel);
54 auto name = config.get<std::string>(
"id");
57 policy.mActive = config.get<
bool>(
"active",
"true");
61 std::vector<OutputSpec> outputSpecs;
63 if (
auto outputsQuery = config.get<std::string>(
"outputs",
""); !outputsQuery.empty()) {
65 if (outputsAsInputSpecs.size() != inputSpecs.size()) {
66 throw std::runtime_error(
67 "The number of outputs should match the number of inputs (queries),"
68 " which is not the case for the policy '" +
72 for (
const auto& outputAsInputSpec : outputsAsInputSpecs) {
74 outputSpecs.back().lifetime = Lifetime::QA;
77 for (
const auto& inputSpec : inputSpecs) {
93 assert(inputSpecs.size() == outputSpecs.size());
94 for (
size_t i = 0;
i < inputSpecs.size();
i++) {
95 policy.registerPath(inputSpecs[
i], outputSpecs[
i]);
98 for (
const auto& conditionConfig : config.get_child(
"samplingConditions")) {
100 condition->configure(conditionConfig.second);
101 policy.registerCondition(std::move(condition));
104 policy.setFairMQOutputChannel(config.get_optional<std::string>(
"fairMQOutput").value_or(
""));
111 const auto it = mPaths.find(input);
112 return it != mPaths.end() ? &(it->second) :
nullptr;
117 bool decision = std::all_of(mConditions.begin(), mConditions.end(),
118 [dataRef](std::unique_ptr<DataSamplingCondition>& condition) {
119 return condition->decide(dataRef);
122 mTotalAcceptedMessages += decision;
123 mTotalEvaluatedMessages++;
130 auto result = mPaths.find(input);
131 if (
result != mPaths.end()) {
151 return mFairMQOutputChannel;
156 size_t nameBegin = mFairMQOutputChannel.find(
"name=") +
sizeof(
"name=") - 1;
157 size_t nameEnd = mFairMQOutputChannel.find_first_of(
',', nameBegin);
158 std::string
name = mFairMQOutputChannel.substr(nameBegin, nameEnd - nameBegin);
164 return mTotalAcceptedMessages;
168 return mTotalEvaluatedMessages;
184 throw std::runtime_error(
"Maximum 100 inputs in DataSamplingPolicy are supported. Call the developers if you really need more.");
187 if (policyName.size() > 14) {
188 LOG(warning) <<
"DataSamplingPolicy name '" << policyName <<
"' is longer than 14 characters, we have to trim it. "
189 <<
"Use a shorter policy name to avoid potential output name conflicts.";
190 policyName.resize(14);
195 return outputDescription;
A definition of DataSamplingConditionFactory.
A declaration of O2 Data Sampling Policy.
static std::unique_ptr< DataSamplingCondition > create(std::string name)
Creates instance of DataSamplingCondition child, given the name.
void setFairMQOutputChannel(std::string)
Sets a raw fair::mq::Channel. Deprecated, do not use.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
uint32_t getTotalEvaluatedMessages() const
static header::DataDescription createPolicyDataDescription(std::string policyName, size_t id)
std::string getFairMQOutputChannelName() const
void registerPath(const framework::InputSpec &, const framework::OutputSpec &)
Adds a new association between inputs and outputs.
DataSamplingPolicy(std::string name)
Constructor.
const PathMap & getPathMap() const
void registerCondition(std::unique_ptr< DataSamplingCondition > &&)
Adds a new association between inputs and outputs.
const framework::OutputSpec * match(const framework::ConcreteDataMatcher &input) const
Returns true if this policy requires data with given InputSpec.
static header::DataOrigin createPolicyDataOrigin()
const std::string & getFairMQOutputChannel() const
bool decide(const o2::framework::DataRef &)
Returns true if user-defined conditions of sampling are fulfilled.
const std::string & getName() const
uint32_t getTotalAcceptedMessages() const
framework::Output prepareOutput(const framework::ConcreteDataMatcher &input, framework::Lifetime lifetime=framework::Lifetime::Timeframe) const
Returns Output for given InputSpec to pass data forward.
GLuint const GLchar * name
constexpr o2::header::DataDescription gDataDescriptionInvalid
constexpr o2::header::DataOrigin gDataOriginInvalid
Defining PrimaryVertex explicitly as messageable.
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
A header which contains some meta-data generated by Data Sampling.
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
header::DataHeader::SubSpecificationType subSpec
static std::vector< InputSpec > parse(const char *s="")
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static OutputSpec asOutputSpec(InputSpec const &spec)
header::DataOrigin origin
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"