30#include <Configuration/ConfigurationInterface.h>
31#include <Configuration/ConfigurationFactory.h>
41 : mName(
std::move(
name)), mReconfigurationSource(reconfigurationSource)
49 LOG(
debug) <<
"Reading Data Sampling Policies...";
50 boost::property_tree::ptree policiesTree;
52 if (mReconfigurationSource.empty() ==
false) {
53 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(mReconfigurationSource);
54 policiesTree = cfg->getRecursive(
"dataSamplingPolicies");
56 }
else if (ctx.
options().
isSet(
"sampling-config-ptree")) {
57 policiesTree = ctx.
options().
get<boost::property_tree::ptree>(
"sampling-config-ptree");
63 for (
auto&& policyConfig : policiesTree) {
67 }
catch (std::exception& ex) {
68 LOG(warn) <<
"Could not load the Data Sampling Policy '"
69 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"', because: " << ex.what();
71 LOG(warn) <<
"Could not load the Data Sampling Policy '"
72 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"'";
88 const DataRef& firstPart = inputIt.getByPos(0);
89 if (firstPart.
header ==
nullptr) {
92 const auto* firstInputHeader = DataRefUtils::getHeader<header::DataHeader*>(firstPart);
93 ConcreteDataMatcher inputMatcher{firstInputHeader->dataOrigin, firstInputHeader->dataDescription, firstInputHeader->subSpecification};
95 for (
auto& policy : mPolicies) {
100 if (
auto route = policy->match(inputMatcher); route !=
nullptr && policy->decide(firstPart)) {
102 auto dsheader = prepareDataSamplingHeader(*policy);
103 for (
const auto& part : inputIt) {
104 if (part.header !=
nullptr) {
109 std::move(extractAdditionalHeaders(part.header)),
111 const auto* partInputHeader = DataRefUtils::getHeader<header::DataHeader*>(part);
114 routeAsConcreteDataType.
origin,
115 routeAsConcreteDataType.description,
116 partInputHeader->subSpecification,
117 std::move(headerStack)};
133void Dispatcher::reportStats(
Monitoring& monitoring)
const
135 uint64_t dispatcherTotalEvaluatedMessages = 0;
136 uint64_t dispatcherTotalAcceptedMessages = 0;
138 for (
const auto& policy : mPolicies) {
139 dispatcherTotalEvaluatedMessages += policy->getTotalEvaluatedMessages();
140 dispatcherTotalAcceptedMessages += policy->getTotalAcceptedMessages();
143 monitoring.send(
Metric{dispatcherTotalEvaluatedMessages,
"Dispatcher_messages_evaluated", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
144 monitoring.send(
Metric{dispatcherTotalAcceptedMessages,
"Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
147DataSamplingHeader Dispatcher::prepareDataSamplingHeader(
const DataSamplingPolicy& policy)
149 uint64_t sampleTime =
static_cast<uint64_t
>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
153 policy.getTotalAcceptedMessages(),
154 policy.getTotalEvaluatedMessages(),
158header::Stack Dispatcher::extractAdditionalHeaders(
const char* inputHeaderStack)
const
163 for (
const auto* current =
first; current !=
nullptr; current = current->next()) {
166 headerStack = std::move(
header::Stack{std::move(headerStack), *current});
175 const auto* inputHeader = DataRefUtils::getHeader<header::DataHeader*>(
inputData);
181 mPolicies.emplace_back(std::move(policy));
194 for (
const auto& policy : mPolicies) {
195 for (
const auto&
path : policy->getPathMap()) {
196 auto& potentiallyNewInput =
path.first;
200 auto newInputIsBroader = [&potentiallyNewInput](
const InputSpec&
other) {
203 declaredInputs.erase(std::remove_if(declaredInputs.begin(), declaredInputs.end(), newInputIsBroader), declaredInputs.end());
205 auto declaredInputIsBroader = [&potentiallyNewInput](
const InputSpec&
other) {
208 if (std::none_of(declaredInputs.begin(), declaredInputs.end(), declaredInputIsBroader)) {
209 declaredInputs.push_back(potentiallyNewInput);
216 timerDescription.
runtimeInit((
"TIMER-" + mName).substr(0, 16).c_str());
217 declaredInputs.emplace_back(
InputSpec{
"timer-stats",
"DS", timerDescription, 0, Lifetime::Timer});
219 return declaredInputs;
225 for (
const auto& policy : mPolicies) {
226 for (
const auto& [_policyInput, policyOutput] : policy->getPathMap()) {
230 declaredOutputs.push_back(policyOutput);
233 return declaredOutputs;
242 return mPolicies.size();
o2::monitoring::Metric Metric
A declaration of O2 Data Sampling Policy.
Declaration of Dispatcher for O2 Data Sampling.
std::vector< o2::mid::ColumnData > inputData
o2::monitoring::Monitoring Monitoring
bool isSet(const char *key) const
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
void run(framework::ProcessingContext &ctx) override
Dispatcher process callback.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
Dispatcher(const std::string name, const std::string reconfigurationSource)
Constructor.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
void init(framework::InitContext &ctx) override
Dispatcher init callback.
framework::Options getOptions()
~Dispatcher() override
Destructor.
const std::string & getName()
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLsizei const GLchar *const * path
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
Defining DataPointCompositeObject explicitly as copiable.
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool includes(const InputSpec &left, const InputSpec &right)
Checks if left includes right (or is equal to)
header::DataOrigin origin
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"