Project
Loading...
Searching...
No Matches
Dispatcher.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/Logger.h"
29
30#include <Configuration/ConfigurationInterface.h>
31#include <Configuration/ConfigurationFactory.h>
32#include <stdexcept>
33
34using namespace o2::configuration;
35using namespace o2::monitoring;
36using namespace o2::framework;
37
38namespace o2::utilities
39{
40
41Dispatcher::Dispatcher(std::string name, const std::string reconfigurationSource)
42 : mName(std::move(name)), mReconfigurationSource(reconfigurationSource)
43{
44}
45
46Dispatcher::~Dispatcher() = default;
47
49{
50 LOG(debug) << "Reading Data Sampling Policies...";
51 boost::property_tree::ptree policiesTree;
52
53 if (mReconfigurationSource.empty() == false) {
54 std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(mReconfigurationSource);
55 policiesTree = cfg->getRecursive("dataSamplingPolicies");
56 mPolicies.clear();
57 } else if (ctx.options().isSet("sampling-config-ptree")) {
58 policiesTree = ctx.options().get<boost::property_tree::ptree>("sampling-config-ptree");
59 mPolicies.clear();
60 } else {
61 ; // we use policies declared during workflow init.
62 }
63
64 for (auto&& policyConfig : policiesTree) {
65 // we don't want the Dispatcher to exit due to one faulty Policy
66 try {
67 mPolicies.emplace_back(std::make_shared<DataSamplingPolicy>(DataSamplingPolicy::fromConfiguration(policyConfig.second)));
68 } catch (std::exception& ex) {
69 LOG(warn) << "Could not load the Data Sampling Policy '"
70 << policyConfig.second.get_optional<std::string>("id").value_or("") << "', because: " << ex.what();
71 } catch (...) {
72 LOG(warn) << "Could not load the Data Sampling Policy '"
73 << policyConfig.second.get_optional<std::string>("id").value_or("") << "'";
74 }
75 }
76
77 auto& spec = ctx.services().get<const DeviceSpec>();
78 mDeviceID.runtimeInit(spec.id.substr(0, DataSamplingHeader::deviceIDTypeSize).c_str());
79}
80
81header::Stack extractAdditionalHeaders(const char* inputHeaderStack)
82{
83 std::array<header::BaseHeader const*, 8> headers;
84 int count = 0;
85 const auto* first = header::BaseHeader::get(reinterpret_cast<const std::byte*>(inputHeaderStack));
86 for (const auto* current = first; current != nullptr; current = current->next()) {
87 if (current->description != header::DataHeader::sHeaderType && current->description != DataProcessingHeader::sHeaderType) {
88 headers[count++] = current;
89 }
90 }
91
92 // Poor man runtime pack expansion.
93 switch (count) {
94 case 0:
95 return header::Stack{};
96 case 1:
97 return header::Stack{*headers[0]};
98 case 2:
99 return header::Stack{*headers[0], *headers[1]};
100 case 3:
101 return header::Stack{*headers[0], *headers[1], *headers[2]};
102 case 4:
103 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3]};
104 case 5:
105 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4]};
106 case 6:
107 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5]};
108 case 7:
109 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6]};
110 case 8:
111 return header::Stack{*headers[0], *headers[1], *headers[2], *headers[3], *headers[4], *headers[5], *headers[6], *headers[7]};
112 default:
113 throw std::runtime_error(fmt::format("Too many headers to copy {}", count));
114 }
115}
116
118{
119 // todo: consider matching (and deciding) in completion policy to save some time
120 // it is not trivial though, we would have to share state with the customize() method,
121 // which is not possible atm.
122
123 for (auto inputIt = ctx.inputs().begin(); inputIt != ctx.inputs().end(); inputIt++) {
124
125 const DataRef& firstPart = inputIt.getByPos(0);
126 if (firstPart.header == nullptr) {
127 continue;
128 }
129 const auto* firstInputHeader = DataRefUtils::getHeader<header::DataHeader*>(firstPart);
130 ConcreteDataMatcher inputMatcher{firstInputHeader->dataOrigin, firstInputHeader->dataDescription, firstInputHeader->subSpecification};
131
132 for (auto& policy : mPolicies) {
133 // fixme: in principle matching could be broken by having query "TST/RAWDATA/0" and having parts with just
134 // the first subspec == 0, but others could be different. However, we trust that DPL does necessary checks
135 // during workflow validation and when passing messages (e.g. query "TST/RAWDATA/0" should not match
136 // a "TST/RAWDATA/*" output.
137 if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) {
138 auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route);
139 auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader);
140 for (const auto& part : inputIt) {
141 if (part.header != nullptr) {
142 // We copy every header which is not DataHeader or DataProcessingHeader,
143 // so that custom data-dependent headers are passed forward,
144 // and we add a DataSamplingHeader.
145 header::Stack headerStack{
146 extractAdditionalHeaders(part.header),
147 dsheader};
148 const auto* partInputHeader = DataRefUtils::getHeader<header::DataHeader*>(part);
149
151 routeAsConcreteDataType.origin,
152 routeAsConcreteDataType.description,
153 partInputHeader->subSpecification,
154 std::move(headerStack)};
155 send(ctx.outputs(), part, output);
156 }
157 }
158 }
159 }
160 }
161
162 if (ctx.inputs().isValid("timer-stats")) {
163 reportStats(ctx.services().get<Monitoring>());
164 }
165 auto& relayer = ctx.services().get<DataRelayer>();
166 auto timeslice = relayer.getOldestPossibleOutput().timeslice.value;
168}
169
170void Dispatcher::reportStats(Monitoring& monitoring) const
171{
172 uint64_t dispatcherTotalEvaluatedMessages = 0;
173 uint64_t dispatcherTotalAcceptedMessages = 0;
174
175 for (const auto& policy : mPolicies) {
176 dispatcherTotalEvaluatedMessages += policy->getTotalEvaluatedMessages();
177 dispatcherTotalAcceptedMessages += policy->getTotalAcceptedMessages();
178 }
179
180 monitoring.send(Metric{dispatcherTotalEvaluatedMessages, "Dispatcher_messages_evaluated", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
181 monitoring.send(Metric{dispatcherTotalAcceptedMessages, "Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
182}
183
184DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original)
185{
186 uint64_t sampleTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
187
188 return {
189 sampleTime,
190 policy.getTotalAcceptedMessages(),
191 policy.getTotalEvaluatedMessages(),
192 mDeviceID,
193 original};
194}
195
196void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, const Output& output) const
197{
198 const auto* inputHeader = DataRefUtils::getHeader<header::DataHeader*>(inputData);
199 dataAllocator.snapshot(output, inputData.payload, DataRefUtils::getPayloadSize(inputData), inputHeader->payloadSerializationMethod);
200}
201
202void Dispatcher::registerPolicy(std::unique_ptr<DataSamplingPolicy>&& policy)
203{
204 mPolicies.emplace_back(std::move(policy));
205}
206
207const std::string& Dispatcher::getName()
208{
209 return mName;
210}
211
213{
214 Inputs declaredInputs;
215
216 // Add data inputs. Avoid duplicates and inputs which include others (e.g. "TST/DATA" includes "TST/DATA/1".
217 for (const auto& policy : mPolicies) {
218 for (const auto& path : policy->getPathMap()) {
219 auto& potentiallyNewInput = path.first;
220
221 // The idea is that we remove all existing inputs which are covered by the potentially new input.
222 // If there are none which are broader than the new one, then we add it.
223 auto newInputIsBroader = [&potentiallyNewInput](const InputSpec& other) {
224 return DataSpecUtils::includes(potentiallyNewInput, other);
225 };
226 declaredInputs.erase(std::remove_if(declaredInputs.begin(), declaredInputs.end(), newInputIsBroader), declaredInputs.end());
227
228 auto declaredInputIsBroader = [&potentiallyNewInput](const InputSpec& other) {
229 return DataSpecUtils::includes(other, potentiallyNewInput);
230 };
231 if (std::none_of(declaredInputs.begin(), declaredInputs.end(), declaredInputIsBroader)) {
232 declaredInputs.push_back(potentiallyNewInput);
233 }
234 }
235 }
236
237 // add timer input
238 header::DataDescription timerDescription;
239 timerDescription.runtimeInit(("TIMER-" + mName).substr(0, 16).c_str());
240 declaredInputs.emplace_back(InputSpec{"timer-stats", "DS", timerDescription, 0, Lifetime::Timer});
241
242 return declaredInputs;
243}
244
246{
247 Outputs declaredOutputs;
248 for (const auto& policy : mPolicies) {
249 for (const auto& [_policyInput, policyOutput] : policy->getPathMap()) {
250 (void)_policyInput;
251 // In principle Data Sampling Policies should have different outputs.
252 // We may add a check to be very gentle with users.
253 declaredOutputs.push_back(policyOutput);
254 }
255 }
256 return declaredOutputs;
257}
259{
260 return {{"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}}};
261}
262
264{
265 return mPolicies.size();
266}
267
268} // namespace o2::utilities
o2::monitoring::Metric Metric
A declaration of O2 Data Sampling Header.
A declaration of O2 Data Sampling Policy.
Declaration of Dispatcher for O2 Data Sampling.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
std::vector< o2::mid::ColumnData > inputData
o2::monitoring::Monitoring Monitoring
void snapshot(const Output &spec, T const &object)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
const_iterator begin() const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
const_iterator end() const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static DataSamplingPolicy fromConfiguration(const boost::property_tree::ptree &)
Configures a policy using structured configuration entry.
void run(framework::ProcessingContext &ctx) override
Dispatcher process callback.
void registerPolicy(std::unique_ptr< DataSamplingPolicy > &&)
Register a Data Sampling Policy.
Dispatcher(const std::string name, const std::string reconfigurationSource)
Constructor.
framework::Inputs getInputSpecs()
Assembles InputSpecs of all registered policies in a single vector, removing overlapping entries.
size_t numberOfPolicies()
Returns the number of registered policies.
framework::Outputs getOutputSpecs()
void init(framework::InitContext &ctx) override
Dispatcher init callback.
framework::Options getOptions()
~Dispatcher() override
Destructor.
const std::string & getName()
GLint GLsizei count
Definition glcorearb.h:399
GLuint const GLchar * name
Definition glcorearb.h:781
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
header::Stack extractAdditionalHeaders(const char *inputHeaderStack)
Defining DataPointCompositeObject explicitly as copiable.
static constexpr const o2::header::HeaderType sHeaderType
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
const char * header
Definition DataRef.h:27
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool includes(const InputSpec &left, const InputSpec &right)
Checks if left includes right (or is equal to)
header::DataOrigin origin
Definition Output.h:28
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
the main header struct
Definition DataHeader.h:618
static constexpr o2::header::HeaderType sHeaderType
Definition DataHeader.h:630
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
static constexpr uint32_t deviceIDTypeSize
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"