Project
Loading...
Searching...
No Matches
CommonDataProcessors.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12
24#include "Framework/InputSpec.h"
27#include "Framework/Variant.h"
33#include "Framework/Signpost.h"
34#include <Monitoring/Monitoring.h>
35
36#include <fairmq/Device.h>
37#include <uv.h>
38#include <fstream>
39#include <functional>
40#include <memory>
41#include <string>
42
43using namespace o2::framework::data_matcher;
44
45// Special log to track callbacks we know about
49
50namespace o2::framework
51{
52
54 CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec> const& danglingOutputInputs,
55 std::vector<InputSpec>& unmatched)
56{
57 auto writerFunction = [danglingOutputInputs](InitContext& ic) -> std::function<void(ProcessingContext&)> {
58 auto filename = ic.options().get<std::string>("outfile");
59 auto keepString = ic.options().get<std::string>("keep");
60
61 if (filename.empty()) {
62 throw runtime_error("output file missing");
63 }
64
65 bool hasOutputsToWrite = false;
66 auto [variables, outputMatcher] = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString);
67 VariableContext context;
68 for (auto& spec : danglingOutputInputs) {
69 auto concrete = DataSpecUtils::asConcreteDataTypeMatcher(spec);
70 if (outputMatcher->match(concrete, context)) {
71 hasOutputsToWrite = true;
72 }
73 }
74 if (hasOutputsToWrite == false) {
75 return [](ProcessingContext&) mutable -> void {
76 static bool once = false;
77 if (!once) {
78 LOG(debug) << "No dangling output to be dumped.";
79 once = true;
80 }
81 };
82 }
83 auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);
84 return [output, matcher = outputMatcher](ProcessingContext& pc) mutable -> void {
85 VariableContext matchingContext;
86 LOG(debug) << "processing data set with " << pc.inputs().size() << " entries";
87 for (const auto& entry : pc.inputs()) {
88 LOG(debug) << " " << *(entry.spec);
89 auto header = DataRefUtils::getHeader<header::DataHeader*>(entry);
90 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(entry);
91 if (matcher->match(*header, matchingContext) == false) {
92 continue;
93 }
94 output->write(reinterpret_cast<char const*>(header), sizeof(header::DataHeader));
95 output->write(reinterpret_cast<char const*>(dataProcessingHeader), sizeof(DataProcessingHeader));
98 }
99 };
100 };
101
102 std::vector<InputSpec> validBinaryInputs;
103 auto onlyTimeframe = [](InputSpec const& input) {
104 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == false) &&
105 input.lifetime == Lifetime::Timeframe;
106 };
107
108 auto noTimeframe = [](InputSpec const& input) {
109 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == true) ||
110 input.lifetime != Lifetime::Timeframe;
111 };
112
113 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
114 std::back_inserter(validBinaryInputs), onlyTimeframe);
115 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
116 std::back_inserter(unmatched), noTimeframe);
117
119 "internal-dpl-injected-global-binary-file-sink",
120 validBinaryInputs,
121 Outputs{},
122 AlgorithmSpec(writerFunction),
123 {{"outfile", VariantType::String, "dpl-out.bin", {"Name of the output file"}},
124 {"keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
125
126 return spec;
127}
128
129DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpec> const& danglingOutputInputs)
130{
131
132 // we build the default channel configuration from the binding of the first input
133 // in order to have more than one we would need to possibility to have support for
134 // vectored options
135 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
136 OutputChannelSpec externalChannelSpec;
137 externalChannelSpec.name = "downstream";
138 externalChannelSpec.type = ChannelType::Push;
139 externalChannelSpec.method = ChannelMethod::Bind;
140 externalChannelSpec.hostname = "localhost";
141 externalChannelSpec.port = 0;
142 externalChannelSpec.listeners = 0;
143 // in principle, protocol and transport are two different things but fur simplicity
144 // we use ipc when shared memory is selected and the normal tcp url whith zeromq,
145 // this is for building the default configuration which can be simply changed from the
146 // command line
147 externalChannelSpec.protocol = ChannelProtocol::IPC;
148 std::string defaultChannelConfig = formatExternalChannelConfiguration(externalChannelSpec);
149 // at some point the formatting tool might add the transport as well so we have to check
150 return specifyFairMQDeviceOutputProxy("internal-dpl-injected-output-proxy", danglingOutputInputs, defaultChannelConfig.c_str());
151}
152
154{
155 O2_SIGNPOST_ID_FROM_POINTER(cid, callbacks, async);
156 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Attempting again propagating rate-limiting information.");
157
158 // Check if this is a source device
159 static size_t lastTimeslice = -1;
160 auto* services = (ServiceRegistryRef*)async->data;
161 auto& timesliceIndex = services->get<TimesliceIndex>();
162 auto* device = services->get<RawDeviceService>().device();
163 auto channel = device->GetChannels().find("metric-feedback");
164 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
165 if (channel == device->GetChannels().end()) {
166 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel.");
167 return;
168 }
169 fair::mq::MessagePtr payload(device->NewMessage());
170 payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr);
171 auto consumed = oldestPossingTimeslice;
172
173 size_t start = uv_hrtime();
174 int64_t result = channel->second[0].Send(payload, 100);
175 size_t stop = uv_hrtime();
176 // If the sending worked, we do not retry.
177 if (result <= 0) {
178 // Forcefully slow down in case FairMQ returns earlier than expected...
179 int64_t ellapsed = (stop - start) / 1000000;
180 if (ellapsed < 100) {
181 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
182 "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
183 result, ellapsed);
184 uv_sleep(100 - ellapsed);
185 } else {
186 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
187 "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed);
188 }
189 // If the sending did not work, we keep trying until it actually works.
190 // This will schedule other tasks in the queue, so the processing of the
191 // data will still happen.
192 uv_async_send(async);
193 } else {
194 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed);
195 lastTimeslice = consumed;
196 }
197}
198
199DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs, std::string rateLimitingChannelConfig)
200{
201 return DataProcessorSpec{
202 .name = "internal-dpl-injected-dummy-sink",
203 .inputs = danglingOutputInputs,
204 .algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) {
205 static uv_async_t async;
206 // The callback will only have access to the
207 async.data = new ServiceRegistryRef{ic.services()};
208 uv_async_init(deviceState.loop, &async, retryMetricCallback);
209 auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) {
210 LOGP(debug, "Domain info updated with timeslice {}", timeslice);
211 retryMetricCallback(&async);
212 auto& timesliceIndex = services.get<TimesliceIndex>();
213 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
214 auto& stats = services.get<DataProcessingStats>();
215 stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
216 stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
217 stats.processCommandQueue();
218 };
219 callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
220
221 return adaptStateless([]() {
222 });
223 })},
224 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{"channel-config", VariantType::String, // raw input channel
225 rateLimitingChannelConfig,
226 {"Out-of-band channel config"}}}
227 : std::vector<ConfigParamSpec>(),
228 .labels = {{"resilient"}}};
229}
230
231// For the cases were the driver is guaranteed to be there (e.g. in analysis) we can use a
232// more sophisticated controller which can get offers for timeslices so that we can rate limit
233// across multiple input devices and rate limit shared memory usage without race conditions
234DataProcessorSpec CommonDataProcessors::getScheduledDummySink(std::vector<InputSpec> const& danglingOutputInputs)
235{
236 return DataProcessorSpec{
237 .name = "internal-dpl-injected-dummy-sink",
238 .inputs = danglingOutputInputs,
239 .algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) {
240 // We update the number of consumed timeframes based on the oldestPossingTimeslice
241 // this information will be aggregated in the driver which will then decide wether or not a new offer for
242 // a timeslice should be done and to which device
243 auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) {
244 LOGP(debug, "Domain info updated with timeslice {}", timeslice);
245 auto& timesliceIndex = services.get<TimesliceIndex>();
246 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
247 auto& stats = services.get<DataProcessingStats>();
248 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
249 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (domain info updated) to be set to %zu.", oldestPossingTimeslice);
250 stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
251 stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
252 stats.processCommandQueue();
253 };
254 callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
255
256 return adaptStateless([](DataProcessingStats& stats, TimesliceIndex& timesliceIndex) {
257 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
258 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
259 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "run", "Consumed timeframes (processing) to be set to %zu.", oldestPossingTimeslice);
260 stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
261 stats.updateStats({(int)ProcessingStatsId::TIMESLICE_NUMBER_DONE, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
262 stats.processCommandQueue();
263 });
264 })},
265 .labels = {{"resilient"}}};
266}
267
269{
270 return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {
271 auto& raw = pcx.services().get<RawDeviceService>();
272 static RateLimiter limiter;
273 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &pcx);
274 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
275 O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback",
276 "Rate limiting to %d timeframes in flight", limit);
277 limiter.check(pcx, limit, 2000);
278 O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback",
279 "Rate limiting passed. Invoking old callback.");
280 original(pcx);
281 O2_SIGNPOST_EVENT_EMIT_DETAIL(rate_limiting, sid, "rate limiting callback",
282 "Rate limited callback done.");
283 });
284}
285
286// The wrapped algorithm consumes 1 timeslice every time is invoked
288{
289 return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {
290 original(pcx);
291
292 auto disposeResources = [](int taskId,
293 std::array<ComputingQuotaOffer, 32>& offers,
294 ComputingQuotaStats& stats,
295 std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> accountDisposed) {
296 ComputingQuotaOffer disposed;
297 disposed.sharedMemory = 0;
298 // When invoked, we have processed one timeslice by construction.
299 int64_t timeslicesProcessed = 1;
300 for (auto& offer : offers) {
301 if (offer.user != taskId) {
302 continue;
303 }
304 int64_t toRemove = std::min((int64_t)timeslicesProcessed, offer.timeslices);
305 offer.timeslices -= toRemove;
306 timeslicesProcessed -= toRemove;
307 disposed.timeslices += toRemove;
308 if (timeslicesProcessed <= 0) {
309 break;
310 }
311 }
312 return accountDisposed(disposed, stats);
313 };
314 pcx.services().get<DeviceState>().offerConsumers.emplace_back(disposeResources);
315 });
316}
317
318} // namespace o2::framework
struct uv_async_s uv_async_t
std::ostringstream debug
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_SIGNPOST_EVENT_EMIT_DETAIL(log, id, name, format,...)
Definition Signpost.h:542
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
Definition InitContext.h:34
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::string filename()
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
Definition DeviceState.h:34
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
the main header struct
Definition DataHeader.h:619
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"