Project
Loading...
Searching...
No Matches
Dispatcher.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/Logger.h"
29
30#include <Configuration/ConfigurationInterface.h>
31#include <Configuration/ConfigurationFactory.h>
32
33using namespace o2::configuration;
34using namespace o2::monitoring;
35using namespace o2::framework;
36
37namespace o2::utilities
38{
39
40Dispatcher::Dispatcher(std::string name, const std::string reconfigurationSource)
41 : mName(std::move(name)), mReconfigurationSource(reconfigurationSource)
42{
43}
44
45Dispatcher::~Dispatcher() = default;
46
48{
49 LOG(debug) << "Reading Data Sampling Policies...";
50 boost::property_tree::ptree policiesTree;
51
52 if (mReconfigurationSource.empty() == false) {
53 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(mReconfigurationSource);
54 policiesTree = cfg->getRecursive("dataSamplingPolicies");
55 mPolicies.clear();
56 } else if (ctx.options().isSet("sampling-config-ptree")) {
57 policiesTree = ctx.options().get<boost::property_tree::ptree>("sampling-config-ptree");
58 mPolicies.clear();
59 } else {
60 ; // we use policies declared during workflow init.
61 }
62
63 for (auto&& policyConfig : policiesTree) {
64 // we don't want the Dispatcher to exit due to one faulty Policy
65 try {
66 mPolicies.emplace_back(std::make_shared<DataSamplingPolicy>(DataSamplingPolicy::fromConfiguration(policyConfig.second)));
67 } catch (std::exception& ex) {
68 LOG(warn) << "Could not load the Data Sampling Policy '"
69 << policyConfig.second.get_optional<std::string>("id").value_or("") << "', because: " << ex.what();
70 } catch (...) {
71 LOG(warn) << "Could not load the Data Sampling Policy '"
72 << policyConfig.second.get_optional<std::string>("id").value_or("") << "'";
73 }
74 }
75
76 auto& spec = ctx.services().get<const DeviceSpec>();
77 mDeviceID.runtimeInit(spec.id.substr(0, DataSamplingHeader::deviceIDTypeSize).c_str());
78}
79
81{
82 // todo: consider matching (and deciding) in completion policy to save some time
83 // it is not trivial though, we would have to share state with the customize() method,
84 // which is not possible atm.
85
86 for (auto inputIt = ctx.inputs().begin(); inputIt != ctx.inputs().end(); inputIt++) {
87
88 const DataRef& firstPart = inputIt.getByPos(0);
89 if (firstPart.header == nullptr) {
90 continue;
91 }
92 const auto* firstInputHeader = DataRefUtils::getHeader<header::DataHeader*>(firstPart);
93 ConcreteDataMatcher inputMatcher{firstInputHeader->dataOrigin, firstInputHeader->dataDescription, firstInputHeader->subSpecification};
94
95 for (auto& policy : mPolicies) {
96 // fixme: in principle matching could be broken by having query "TST/RAWDATA/0" and having parts with just
97 // the first subspec == 0, but others could be different. However, we trust that DPL does necessary checks
98 // during workflow validation and when passing messages (e.g. query "TST/RAWDATA/0" should not match
99 // a "TST/RAWDATA/*" output.
100 if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) {
101 auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route);
102 auto dsheader = prepareDataSamplingHeader(*policy);
103 for (const auto& part : inputIt) {
104 if (part.header != nullptr) {
105 // We copy every header which is not DataHeader or DataProcessingHeader,
106 // so that custom data-dependent headers are passed forward,
107 // and we add a DataSamplingHeader.
108 header::Stack headerStack{
109 std::move(extractAdditionalHeaders(part.header)),
110 dsheader};
111 const auto* partInputHeader = DataRefUtils::getHeader<header::DataHeader*>(part);
112
114 routeAsConcreteDataType.origin,
115 routeAsConcreteDataType.description,
116 partInputHeader->subSpecification,
117 std::move(headerStack)};
118 send(ctx.outputs(), part, output);
119 }
120 }
121 }
122 }
123 }
124
125 if (ctx.inputs().isValid("timer-stats")) {
126 reportStats(ctx.services().get<Monitoring>());
127 }
128 auto& relayer = ctx.services().get<DataRelayer>();
129 auto timeslice = relayer.getOldestPossibleOutput().timeslice.value;
131}
132
133void Dispatcher::reportStats(Monitoring& monitoring) const
134{
135 uint64_t dispatcherTotalEvaluatedMessages = 0;
136 uint64_t dispatcherTotalAcceptedMessages = 0;
137
138 for (const auto& policy : mPolicies) {
139 dispatcherTotalEvaluatedMessages += policy->getTotalEvaluatedMessages();
140 dispatcherTotalAcceptedMessages += policy->getTotalAcceptedMessages();
141 }
142
143 monitoring.send(Metric{dispatcherTotalEvaluatedMessages, "Dispatcher_messages_evaluated", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
144 monitoring.send(Metric{dispatcherTotalAcceptedMessages, "Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
145}
146
147DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy)
148{
149 uint64_t sampleTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
150
151 return {
152 sampleTime,
153 policy.getTotalAcceptedMessages(),
154 policy.getTotalEvaluatedMessages(),
155 mDeviceID};
156}
157
158header::Stack Dispatcher::extractAdditionalHeaders(const char* inputHeaderStack) const
159{
160 header::Stack headerStack;
161
162 const auto* first = header::BaseHeader::get(reinterpret_cast<const std::byte*>(inputHeaderStack));
163 for (const auto* current = first; current != nullptr; current = current->next()) {
164 if (current->description != header::DataHeader::sHeaderType &&
165 current->description != DataProcessingHeader::sHeaderType) {
166 headerStack = std::move(header::Stack{std::move(headerStack), *current});
167 }
168 }
169
170 return headerStack;
171}
172
173void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, const Output& output) const
174{
175 const auto* inputHeader = DataRefUtils::getHeader<header::DataHeader*>(inputData);
176 dataAllocator.snapshot(output, inputData.payload, DataRefUtils::getPayloadSize(inputData), inputHeader->payloadSerializationMethod);
177}
178
179void Dispatcher::registerPolicy(std::unique_ptr<DataSamplingPolicy>&& policy)
180{
181 mPolicies.emplace_back(std::move(policy));
182}
183
184const std::string& Dispatcher::getName()
185{
186 return mName;
187}
188
190{
191 Inputs declaredInputs;
192
193 // Add data inputs. Avoid duplicates and inputs which include others (e.g. "TST/DATA" includes "TST/DATA/1".
194 for (const auto& policy : mPolicies) {
195 for (const auto& path : policy->getPathMap()) {
196 auto& potentiallyNewInput = path.first;
197
198 // The idea is that we remove all existing inputs which are covered by the potentially new input.
199 // If there are none which are broader than the new one, then we add it.
200 auto newInputIsBroader = [&potentiallyNewInput](const InputSpec& other) {
201 return DataSpecUtils::includes(potentiallyNewInput, other);
202 };
203 declaredInputs.erase(std::remove_if(declaredInputs.begin(), declaredInputs.end(), newInputIsBroader), declaredInputs.end());
204
205 auto declaredInputIsBroader = [&potentiallyNewInput](const InputSpec& other) {
206 return DataSpecUtils::includes(other, potentiallyNewInput);
207 };
208 if (std::none_of(declaredInputs.begin(), declaredInputs.end(), declaredInputIsBroader)) {
209 declaredInputs.push_back(potentiallyNewInput);
210 }
211 }
212 }
213
214 // add timer input
215 header::DataDescription timerDescription;
216 timerDescription.runtimeInit(("TIMER-" + mName).substr(0, 16).c_str());
217 declaredInputs.emplace_back(InputSpec{"timer-stats", "DS", timerDescription, 0, Lifetime::Timer});
218
219 return declaredInputs;
220}
221
223{
224 Outputs declaredOutputs;
225 for (const auto& policy : mPolicies) {
226 for (const auto& [_policyInput, policyOutput] : policy->getPathMap()) {
227 (void)_policyInput;
228 // In principle Data Sampling Policies should have different outputs.
229 // We may add a check to be very gentle with users.
230 declaredOutputs.push_back(policyOutput);
231 }
232 }
233 return declaredOutputs;
234}
236{
237 return {{"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}}};
238}
239
241{
242 return mPolicies.size();
243}
244
245} // namespace o2::utilities
o2::monitoring::Metric Metric
A declaration of O2 Data Sampling Header.
A declaration of O2 Data Sampling Policy.
Declaration of Dispatcher for O2 Data Sampling.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
std::vector< o2::mid::ColumnData > inputData
o2::monitoring::Monitoring Monitoring
void snapshot(const Output &spec, T const &object)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
const_iterator begin() const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
const_iterator end() const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
void run(framework::ProcessingContext &ctx) override
Dispatcher process callback.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
Dispatcher(const std::string name, const std::string reconfigurationSource)
Constructor.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
void init(framework::InitContext &ctx) override
Dispatcher init callback.
framework::Options getOptions()
~Dispatcher() override
Destructor.
const std::string & getName()
GLuint const GLchar * name
Definition glcorearb.h:781
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
Defining DataPointCompositeObject explicitly as copiable.
static constexpr const o2::header::HeaderType sHeaderType
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
const char * header
Definition DataRef.h:27
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool includes(const InputSpec &left, const InputSpec &right)
Checks if left includes right (or is equal to)
header::DataOrigin origin
Definition Output.h:28
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
static constexpr o2::header::HeaderType sHeaderType
Definition DataHeader.h:630
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
static constexpr uint32_t deviceIDTypeSize
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"