30#include <Configuration/ConfigurationInterface.h>
31#include <Configuration/ConfigurationFactory.h>
42 : mName(
std::move(
name)), mReconfigurationSource(reconfigurationSource)
50 LOG(
debug) <<
"Reading Data Sampling Policies...";
51 boost::property_tree::ptree policiesTree;
53 if (mReconfigurationSource.empty() ==
false) {
54 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(mReconfigurationSource);
55 policiesTree = cfg->getRecursive(
"dataSamplingPolicies");
57 }
else if (ctx.
options().
isSet(
"sampling-config-ptree")) {
58 policiesTree = ctx.
options().
get<boost::property_tree::ptree>(
"sampling-config-ptree");
64 for (
auto&& policyConfig : policiesTree) {
68 }
catch (std::exception& ex) {
69 LOG(warn) <<
"Could not load the Data Sampling Policy '"
70 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"', because: " << ex.what();
72 LOG(warn) <<
"Could not load the Data Sampling Policy '"
73 << policyConfig.second.get_optional<std::string>(
"id").value_or(
"") <<
"'";
83 std::array<header::BaseHeader const*, 8> headers;
86 for (
const auto* current =
first; current !=
nullptr; current = current->next()) {
88 headers[
count++] = current;
103 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3]};
105 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4]};
107 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5]};
109 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6]};
111 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6], *headers[7]};
113 throw std::runtime_error(fmt::format(
"Too many headers to copy {}",
count));
125 const DataRef& firstPart = inputIt.getByPos(0);
126 if (firstPart.
header ==
nullptr) {
129 const auto* firstInputHeader = DataRefUtils::getHeader<header::DataHeader*>(firstPart);
130 ConcreteDataMatcher inputMatcher{firstInputHeader->dataOrigin, firstInputHeader->dataDescription, firstInputHeader->subSpecification};
132 for (
auto& policy : mPolicies) {
137 if (
auto route = policy->match(inputMatcher); route !=
nullptr && policy->decide(firstPart)) {
139 auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader);
140 for (
const auto& part : inputIt) {
141 if (part.header !=
nullptr) {
148 const auto* partInputHeader = DataRefUtils::getHeader<header::DataHeader*>(part);
151 routeAsConcreteDataType.
origin,
152 routeAsConcreteDataType.description,
153 partInputHeader->subSpecification,
154 std::move(headerStack)};
170void Dispatcher::reportStats(
Monitoring& monitoring)
const
172 uint64_t dispatcherTotalEvaluatedMessages = 0;
173 uint64_t dispatcherTotalAcceptedMessages = 0;
175 for (
const auto& policy : mPolicies) {
176 dispatcherTotalEvaluatedMessages += policy->getTotalEvaluatedMessages();
177 dispatcherTotalAcceptedMessages += policy->getTotalAcceptedMessages();
180 monitoring.send(
Metric{dispatcherTotalEvaluatedMessages,
"Dispatcher_messages_evaluated", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
181 monitoring.send(
Metric{dispatcherTotalAcceptedMessages,
"Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
184DataSamplingHeader Dispatcher::prepareDataSamplingHeader(
const DataSamplingPolicy& policy,
header::DataHeader const& original)
186 uint64_t sampleTime =
static_cast<uint64_t
>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
190 policy.getTotalAcceptedMessages(),
191 policy.getTotalEvaluatedMessages(),
198 const auto* inputHeader = DataRefUtils::getHeader<header::DataHeader*>(
inputData);
204 mPolicies.emplace_back(std::move(policy));
217 for (
const auto& policy : mPolicies) {
218 for (
const auto&
path : policy->getPathMap()) {
219 auto& potentiallyNewInput =
path.first;
223 auto newInputIsBroader = [&potentiallyNewInput](
const InputSpec&
other) {
226 declaredInputs.erase(std::remove_if(declaredInputs.begin(), declaredInputs.end(), newInputIsBroader), declaredInputs.end());
228 auto declaredInputIsBroader = [&potentiallyNewInput](
const InputSpec&
other) {
231 if (std::none_of(declaredInputs.begin(), declaredInputs.end(), declaredInputIsBroader)) {
232 declaredInputs.push_back(potentiallyNewInput);
239 timerDescription.
runtimeInit((
"TIMER-" + mName).substr(0, 16).c_str());
240 declaredInputs.emplace_back(
InputSpec{
"timer-stats",
"DS", timerDescription, 0, Lifetime::Timer});
242 return declaredInputs;
248 for (
const auto& policy : mPolicies) {
249 for (
const auto& [_policyInput, policyOutput] : policy->getPathMap()) {
253 declaredOutputs.push_back(policyOutput);
256 return declaredOutputs;
265 return mPolicies.size();
o2::monitoring::Metric Metric
A declaration of O2 Data Sampling Policy.
Declaration of Dispatcher for O2 Data Sampling.
std::vector< o2::mid::ColumnData > inputData
o2::monitoring::Monitoring Monitoring
bool isSet(const char *key) const
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
void run(framework::ProcessingContext &ctx) override
Dispatcher process callback.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
Dispatcher(const std::string name, const std::string reconfigurationSource)
Constructor.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
void init(framework::InitContext &ctx) override
Dispatcher init callback.
framework::Options getOptions()
~Dispatcher() override
Destructor.
const std::string & getName()
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLsizei const GLchar *const * path
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
header::Stack extractAdditionalHeaders(const char *inputHeaderStack)
Defining DataPointCompositeObject explicitly as copiable.
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool includes(const InputSpec &left, const InputSpec &right)
Checks if left includes right (or is equal to)
header::DataOrigin origin
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"